/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define _DEFAULT_SOURCE #include "os.h" #include "taosmsg.h" #include "taoserror.h" #include "tqueue.h" #include "trpc.h" #include "tsdb.h" #include "twal.h" #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" #include "vnodeLog.h" #include "query.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { SVnodeObj *pVnode = (SVnodeObj *)param; if (vnodeProcessReadMsgFp[msgType] == NULL) { vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); return TSDB_CODE_VND_MSG_NOT_PROCESSED; } if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) { vTrace("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); return TSDB_CODE_VND_INVALID_VGROUP_ID; } // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vTrace("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_RPC_NOT_READY; } return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); } static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; qinfo_t pQInfo = NULL; if (contLen != 0) { code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->code = code; pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); } else { assert(pCont != NULL); pQInfo = pCont; code = TSDB_CODE_VND_ACTION_IN_PROGRESS; vTrace("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); } if (pQInfo != NULL) { vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo); qTableQuery(pQInfo); // do execute query } return code; } static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet)); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); int32_t code = qRetrieveQueryResultInfo(pQInfo); if (code != TSDB_CODE_SUCCESS) { //TODO pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); } else { // todo check code and handle error in build result set code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); if (qHasMoreResultsToRetrieve(pQInfo)) { pRet->qhandle = pQInfo; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; } else { // no further execution invoked, release the ref to vnode qDestroyQueryInfo(pQInfo); vnodeRelease(pVnode); } } vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); return code; }