/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include #include "os.h" #include "tglobal.h" #include "taoserror.h" #include "taosmsg.h" #include "tcache.h" #include "query.h" #include "trpc.h" #include "tsdb.h" #include "vnode.h" #include "vnodeInt.h" #include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { SVnodeObj *pVnode = (SVnodeObj *)param; int msgType = pReadMsg->rpcMsg.msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); return TSDB_CODE_VND_MSG_NOT_PROCESSED; } if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); return TSDB_CODE_VND_INVALID_STATUS; } // tsdb may be in reset state if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_RPC_NOT_READY; // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_RPC_NOT_READY; } return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle, void* handle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; pRead->contLen = 0; pRead->rpcMsg.handle = handle; atomic_add_fetch_32(&pVnode->refCount, 1); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); } static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; SRspRet *pRet = &pReadMsg->rspRet; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle); if (qhandle == NULL || *qhandle == NULL) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); } else { assert(*qhandle == (void*) killQueryMsg->qhandle); qKillQuery(*qhandle); qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); } return TSDB_CODE_TSC_QUERY_CANCELLED; } int32_t code = TSDB_CODE_SUCCESS; void** handle = NULL; if (contLen != 0) { qinfo_t pQInfo = NULL; code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; pRsp->qhandle = 0; pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; int32_t vgId = pVnode->vgId; // current connect is broken if (code == TSDB_CODE_SUCCESS) { // handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); handle = &pQInfo; if (handle == NULL) { // failed to register qhandle vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; qDestroyQueryInfo(pQInfo); // destroy it directly } else { assert(*handle == pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo); } // pQInfo = NULL; if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return pRsp->code; } } else { assert(pQInfo == NULL); } if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } } else { assert(pCont != NULL); void* p = (void*) pCont; handle = &p; // handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); if (handle == NULL) { vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); code = TSDB_CODE_QRY_INVALID_QHANDLE; } else { vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont); bool buildRes = qTableQuery(*handle); // do execute query if (buildRes) { // build result rsp SReadMsg* pRetrieveMsg = qGetResultRetrieveMsg(*handle); assert(pRetrieveMsg != NULL); vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pRetrieveMsg->rpcMsg.handle); pReadMsg->rpcMsg.handle = pRetrieveMsg->rpcMsg.handle; // update the connection info according to the retrieve connection pRet = &pReadMsg->rspRet; code = TSDB_CODE_QRY_HAS_RSP; bool continueExec = false; if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); pRet->qhandle = *handle; code = TSDB_CODE_SUCCESS; } } else { // todo handle error } code = TSDB_CODE_QRY_HAS_RSP; } } // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } return code; } static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void * pCont = pReadMsg->pCont; SRspRet *pRet = &pReadMsg->rspRet; SRetrieveTableMsg *pRetrieve = pCont; pRetrieve->qhandle = htobe64(pRetrieve->qhandle); pRetrieve->free = htons(pRetrieve->free); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, (void*) pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; void** handle = NULL; void* p1 = (void*) pRetrieve->qhandle; handle = &p1; // void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { code = TSDB_CODE_QRY_INVALID_QHANDLE; vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); SRetrieveTableRsp* pRsp = pRet->rsp; pRsp->numOfRows = 0; pRsp->useconds = 0; pRsp->completed = true; return code; } if (pRetrieve->free == 1) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); qKillQuery(*handle); // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); SRetrieveTableRsp* pRsp = pRet->rsp; pRsp->numOfRows = 0; pRsp->completed = true; pRsp->useconds = 0; return code; } bool freeHandle = true; bool buildRes = false; code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg); if (code != TSDB_CODE_SUCCESS) { //TODO handle malloc failure pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); } else { // result is not ready, return immediately if (!buildRes) { return TSDB_CODE_QRY_NOT_READY; } bool continueExec = false; if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); pRet->qhandle = *handle; freeHandle = false; } } else { pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); } } UNUSED(freeHandle); // qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle); return code; } // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); killQueryMsg->qhandle = htobe64((uint64_t) qhandle); killQueryMsg->free = htons(1); killQueryMsg->header.vgId = htonl(vgId); killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); }