diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 2f9e9a0af9d6e573892a8e8cd01e3075b2a2bf7b..cd18ae6ddabfdfd093126dc84adf89591cf68648 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -26,13 +26,6 @@ #include "dnodeVRead.h" #include "vnode.h" -typedef struct { - SRspRet rspRet; - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; -} SReadMsg; - typedef struct { pthread_t thread; // thread int32_t workerId; // worker ID @@ -218,7 +211,7 @@ static void *dnodeProcessReadQueue(void *param) { } dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); - int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); + int32_t code = vnodeProcessRead(pVnode, pReadMsg); dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 069f99263deeddfe0794437aa010b2b330dd4db9..0da1f51e27fc8ba882a6c7d60ccfdaf0245e83dc 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -34,6 +34,13 @@ typedef struct { void *qhandle; //used by query and retrieve msg } SRspRet; +typedef struct { + SRspRet rspRet; + void *pCont; + int32_t contLen; + SRpcMsg rpcMsg; +} SReadMsg; + int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); @@ -52,7 +59,7 @@ void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); void vnodeBuildStatusMsg(void * param); -int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret); +int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f198c2ffe417ae918b14e291b7390466c62b11a5..974697d057ef92968ae838c9708c732393505ea5 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -27,17 +27,18 @@ #include "vnodeLog.h" #include "query.h" -static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } -int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { +int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { SVnodeObj *pVnode = (SVnodeObj *)param; + int msgType = pReadMsg->rpcMsg.msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); @@ -55,10 +56,14 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, return TSDB_CODE_RPC_NOT_READY; } - return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { + void * pCont = pReadMsg->pCont; + int32_t contLen = pReadMsg->contLen; + SRspRet *pRet = &pReadMsg->rspRet; + SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; memset(pRet, 0, sizeof(SRspRet)); @@ -91,7 +96,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont return code; } -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { + void * pCont = pReadMsg->pCont; + SRspRet *pRet = &pReadMsg->rspRet; + SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet));