提交 4cce8fbb 编写于 作者: S slguan

dnode

上级 cdc3ce72
......@@ -27,6 +27,9 @@ void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, vo
void dnodeSendVpeerCfgMsg(int32_t vnode);
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
#ifdef __cplusplus
}
#endif
......
......@@ -56,6 +56,8 @@ void dnodeAllocModules();
int32_t dnodeInitModules();
void dnodeCleanUpModules();
extern void (*dnodeStartModules)();
#ifdef __cplusplus
}
#endif
......
......@@ -22,8 +22,6 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include "dnode.h"
typedef enum {
TSDB_DNODE_RUN_STATUS_INITIALIZE,
......@@ -41,7 +39,6 @@ extern void ** tsRpcQhandle;
extern void *tsQueryQhandle;
extern void *tsDnodeMgmtQhandle;
int32_t dnodeInitSystem();
void dnodeCleanUpSystem();
void dnodeInitPlugins();
......
......@@ -45,7 +45,7 @@ bool dnodeCheckVnodeExist(int32_t vid);
* Create vnode with specified configuration and open it
* if exist, config it
*/
void* dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg);
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg);
/*
* Remove vnode from local repository
......
......@@ -14,7 +14,6 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tlog.h"
......@@ -28,13 +27,13 @@
#include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h"
static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn);
static int32_t (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg();
void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) {
int8_t msgType = *(sched->msg - 1);
int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t));
int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t));
int8_t *pCont = sched->msg;
int32_t contLen = (int32_t) sched->ahandle;
void *pConn = NULL;
mgmtProcessMsgFromDnode(pCont, contLen, msgType, pConn);
......@@ -43,13 +42,13 @@ void dnodeSendMsgToMnodeImpFp(SSchedMsg *sched) {
int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
dTrace("msg:%s is sent to mnode", taosMsg[msgType]);
*(pCont-1) = msgType;
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeImpFp;
schedMsg.msg = pCont;
SSchedMsg schedMsg;
schedMsg.fp = dnodeSendMsgToMnodeImpFp;
schedMsg.msg = pCont;
schedMsg.ahandle = (void*)contLen;
schedMsg.thandle = NULL;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
return TSDB_CODE_SUCCESS;
......@@ -57,7 +56,7 @@ int32_t dnodeSendMsgToMnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType) = dnodeSendMsgToMnodeImp;
int32_t dnodeSendSimpleRspToMnodeImp(int32_t msgType, int32_t code) {
int32_t dnodeSendSimpleRspToMnodeImp(void *pConn, int32_t msgType, int32_t code) {
int8_t *pCont = rpcMallocCont(sizeof(int32_t));
*(int32_t *) pCont = code;
......@@ -65,7 +64,7 @@ int32_t dnodeSendSimpleRspToMnodeImp(int32_t msgType, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t (*dnodeSendSimpleRspToMnode)(int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp;
int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code) = dnodeSendSimpleRspToMnodeImp;
int32_t dnodeInitMgmtImp() {
dnodeInitProcessShellMsg();
......@@ -83,24 +82,24 @@ void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, vo
dError("invalid msg type:%d", msgType);
} else {
if (dnodeProcessShellMsgFp[msgType]) {
(*dnodeProcessShellMsgFp[msgType])(pCont, contLen, pConn);
(*dnodeProcessShellMsgFp[msgType])(pCont, contLen, msgType, pConn);
} else {
dError("%s is not processed", taosMsg[msgType]);
}
}
}
int32_t dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
int32_t code = htonl(*((int32_t *) pMsg));
int32_t dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SDCreateTableMsg *table = (SDCreateTableMsg *) (pMsg + sizeof(int32_t));
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
return dnodeCreateTable(table);
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pMsg + sizeof(int32_t));
int32_t vnode = htonl(table->vnode);
int32_t sid = htonl(table->sid);
uint64_t uid = htobe64(table->uid);
SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(table->vnode);
int32_t sid = htonl(table->sid);
uint64_t uid = htobe64(table->uid);
dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
return dnodeDropTable(vnode, sid, uid);
} else {
......@@ -109,21 +108,21 @@ int32_t dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
}
}
int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDCreateTableMsg *table = (SDCreateTableMsg *) pCont;
int32_t code = dnodeCreateTable(table);
rpcSendSimpleRsp(pConn, code);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
return code;
}
int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SDCreateTableMsg *table = (SDCreateTableMsg *) pCont;
int32_t code = dnodeCreateTable(table);
rpcSendSimpleRsp(pConn, code);
int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont;
int32_t code = dnodeCreateStream(stream);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
return code;
}
int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont;
int32_t vnode = htonl(table->vnode);
int32_t sid = htonl(table->sid);
......@@ -131,11 +130,11 @@ int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, void *pCo
dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
int32_t code = dnodeDropTable(vnode, sid, uid);
rpcSendSimpleRsp(pConn, code);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
return code;
}
int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
......@@ -153,33 +152,33 @@ int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, void *pConn) {
}
}
int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SVPeersMsg *vpeer = (SVPeersMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
dPrint("vnode:%d, start to config", vnode);
int32_t code = dnodeCreateVnode(vnode, vpeer);
rpcSendSimpleRsp(pConn, code);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
return code;
}
int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
dPrint("vnode:%d, remove it", vnode);
int32_t code = dnodeDropVnode(vnode);
rpcSendSimpleRsp(pConn, code);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
return code;
}
int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, void *pConn) {
int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCfgMsg *pCfg = (SCfgMsg *)pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config);
rpcSendSimpleRsp(pConn, code);
dnodeSendSimpleRspToMnode(pConn, msgType + 1, code);
return code;
}
......@@ -190,7 +189,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) {
}
cfg->vnode = htonl(vnode);
dnodeSendMsgToMnode(cfg, sizeof(SVpeerCfgMsg), TSDB_MSG_TYPE_VNODE_CFG);
dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SVpeerCfgMsg), TSDB_MSG_TYPE_VNODE_CFG);
}
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
......@@ -200,20 +199,16 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
}
cfg->vnode = htonl(vnode);
dnodeSendMsgToMnode(cfg, sizeof(SMeterCfgMsg), TSDB_MSG_TYPE_TABLE_CFG);
dnodeSendMsgToMnode((int8_t*)cfg, sizeof(SMeterCfgMsg), TSDB_MSG_TYPE_TABLE_CFG);
}
void dnodeInitProcessShellMsg() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
}
\ No newline at end of file
......@@ -17,41 +17,41 @@
#include "os.h"
#include "tlog.h"
#include "tglobalcfg.h"
#include "mnode.h"
#include "http.h"
#include "monitor.h"
#include "dnodeModule.h"
#include "dnodeSystem.h"
#include "monitorSystem.h"
#include "httpSystem.h"
#include "mgmtSystem.h"
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].equalVnodeNum = tsMgmtEqualVnodeNum;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
}
......@@ -71,7 +71,7 @@ void dnodeCleanUpModules() {
}
void dnodeProcessModuleStatus(uint32_t status) {
if (tsDnodeRunStatus) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
return;
}
......@@ -112,7 +112,7 @@ int32_t dnodeInitModules() {
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
void dnodeStartModulesImp() {
......@@ -128,4 +128,5 @@ void dnodeStartModulesImp() {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
void (*dnodeStartModules)() = dnodeStartModulesImp;
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tsched.h"
#include "dnode.h"
#include "dnodeRead.h"
#include "dnodeSystem.h"
......@@ -32,14 +33,14 @@ void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_
}
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
//SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
void *pConn = pSched->ahandle;
//examples
int32_t code = TSDB_CODE_INVALID_QHANDLE;
void *pQInfo = NULL; //get from pConn
(*callback)(code, NULL, pConn);
(*callback)(code, pQInfo, pConn);
//TODO build response here
......@@ -47,8 +48,8 @@ static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
}
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
int8_t *msg = malloc(sizeof(pRetrieve));
memcpy(msg, pRetrieve, sizeof(pRetrieve));
int8_t *msg = malloc(sizeof(SRetrieveMeterMsg));
memcpy(msg, pRetrieve, sizeof(SRetrieveMeterMsg));
SSchedMsg schedMsg;
schedMsg.msg = msg;
......@@ -62,6 +63,8 @@ int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) {
return 0;
}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {
return 0;
}
......@@ -40,19 +40,19 @@ static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) {
void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) {
assert(handle != NULL);
if (pCont == NULL || contLen == 0) {
dnodeFreeQInfo(handle);
dTrace("conn:%p, free query info", handle);
return;
return NULL;
}
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY);
dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return;
return NULL;
}
dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]);
......@@ -66,6 +66,8 @@ void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, voi
} else {
dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]);
}
return NULL;
}
int32_t dnodeInitShell() {
......
......@@ -19,6 +19,7 @@
#include "taoserror.h"
#include "tcrc32c.h"
#include "tlog.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
......
......@@ -59,6 +59,8 @@ int32_t dnodeCreateStream(SAlterStreamMsg *stream) {
}
//TODO create or remove stream
return 0;
}
/*
......
......@@ -22,7 +22,6 @@ extern "C" {
#include <stdint.h>
#include <pthread.h>
#include "tsched.h"
typedef struct {
int32_t queryReqNum;
......@@ -48,10 +47,13 @@ extern int32_t (*dnodeCheckSystem)();
extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmt)();
// dnodeMgmt
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
int32_t (*dnodeSendSimpleRspToMnode)(int32_t msgType, int32_t code);
// dnodeModule
extern void (*dnodeStartModules)();
// multilevelStorage
extern int32_t (*dnodeInitStorage)();
......@@ -59,9 +61,6 @@ extern void (*dnodeCleanupStorage)();
void dnodeCheckDataDirOpenned(const char* dir);
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
void dnodeLockVnodes();
void dnodeUnLockVnodes();
SDnodeStatisInfo dnodeGetStatisInfo();
......
......@@ -20,11 +20,13 @@
extern "C" {
#endif
#include "tglobalcfg.h"
#include "tlog.h"
#include <stdint.h>
int32_t httpGetReqCount();
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
void httpCleanUpSystem();
#ifdef __cplusplus
}
......
......@@ -336,16 +336,13 @@ typedef struct {
} SShowObj;
extern int32_t (*mgmtInitSystem)();
extern void (*mgmtStopSystem)();
//mgmtSystem
int32_t mgmtStartSystem();
void mgmtCleanUpSystem();
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern void (*mgmtCleanUpRedirect)();
extern int32_t (*mgmtInitSystem)();
extern void (*mgmtStopSystem)();
extern void (*mgmtCleanUpRedirect)();
#ifdef __cplusplus
}
......
......@@ -13,11 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __MONITOR_H__
#define __MONITOR_H__
#ifndef TDENGINE_MONITOR_H
#define TDENGINE_MONITOR_H
#include "tglobalcfg.h"
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#endif
\ No newline at end of file
int32_t monitorInitSystem();
int32_t monitorStartSystem();
void monitorStopSystem();
void monitorCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
......@@ -28,118 +28,97 @@ extern "C" {
#include "taosdef.h"
// message type
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE 11
#define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE 13
#define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE_RSP 14
#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE 15
#define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE_RSP 16
#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE 17
#define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE_RSP 18
#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE 19
#define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE_RSP 20
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE 21
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE_RSP 22
#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE 23
#define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE_RSP 24
#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE 25
#define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE_RSP 26
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE 27
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE_RSP 28
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE 29
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE_RSP 30
#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE 31
#define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE_RSP 32
#define TSDB_MSG_TYPE_DNODE_VPEERS 33
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 34
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 35
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 36
#define TSDB_MSG_TYPE_DNODE_CFG 37
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 38
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 39
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 40
#define TSDB_MSG_TYPE_SDB_SYNC 41
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 42
#define TSDB_MSG_TYPE_SDB_FORWARD 43
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 44
#define TSDB_MSG_TYPE_CONNECT 51
#define TSDB_MSG_TYPE_CONNECT_RSP 52
#define TSDB_MSG_TYPE_CREATE_ACCT 53
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 54
#define TSDB_MSG_TYPE_ALTER_ACCT 55
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 56
#define TSDB_MSG_TYPE_DROP_ACCT 57
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 58
#define TSDB_MSG_TYPE_CREATE_USER 59
#define TSDB_MSG_TYPE_CREATE_USER_RSP 60
#define TSDB_MSG_TYPE_ALTER_USER 61
#define TSDB_MSG_TYPE_ALTER_USER_RSP 62
#define TSDB_MSG_TYPE_DROP_USER 63
#define TSDB_MSG_TYPE_DROP_USER_RSP 64
#define TSDB_MSG_TYPE_CREATE_MNODE 65
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 66
#define TSDB_MSG_TYPE_DROP_MNODE 67
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 68
#define TSDB_MSG_TYPE_CREATE_DNODE 69
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 70
#define TSDB_MSG_TYPE_DROP_DNODE 71
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 72
#define TSDB_MSG_TYPE_ALTER_DNODE 73
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 74
#define TSDB_MSG_TYPE_CREATE_DB 75
#define TSDB_MSG_TYPE_CREATE_DB_RSP 76
#define TSDB_MSG_TYPE_DROP_DB 77
#define TSDB_MSG_TYPE_DROP_DB_RSP 78
#define TSDB_MSG_TYPE_USE_DB 79
#define TSDB_MSG_TYPE_USE_DB_RSP 80
#define TSDB_MSG_TYPE_ALTER_DB 81
#define TSDB_MSG_TYPE_ALTER_DB_RSP 82
#define TSDB_MSG_TYPE_CREATE_TABLE 83
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 84
#define TSDB_MSG_TYPE_DROP_TABLE 85
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 86
#define TSDB_MSG_TYPE_ALTER_TABLE 87
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 88
#define TSDB_MSG_TYPE_VNODE_CFG 89
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 90
#define TSDB_MSG_TYPE_TABLE_CFG 91
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 92
#define TSDB_MSG_TYPE_TABLE_META 93
#define TSDB_MSG_TYPE_TABLE_META_RSP 94
#define TSDB_MSG_TYPE_STABLE_META 95
#define TSDB_MSG_TYPE_STABLE_META_RSP 96
#define TSDB_MSG_TYPE_MULTI_TABLE_META 97
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 98
#define TSDB_MSG_TYPE_ALTER_STREAM 99
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 100
#define TSDB_MSG_TYPE_SHOW 101
#define TSDB_MSG_TYPE_SHOW_RSP 102
#define TSDB_MSG_TYPE_CFG_MNODE 103
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 104
#define TSDB_MSG_TYPE_KILL_QUERY 105
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 106
#define TSDB_MSG_TYPE_KILL_STREAM 107
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 108
#define TSDB_MSG_TYPE_KILL_CONNECTION 109
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 110
#define TSDB_MSG_TYPE_HEARTBEAT 111
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 112
#define TSDB_MSG_TYPE_STATUS 113
#define TSDB_MSG_TYPE_STATUS_RSP 114
#define TSDB_MSG_TYPE_GRANT 115
#define TSDB_MSG_TYPE_GRANT_RSP 116
#define TSDB_MSG_TYPE_MAX 117
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_VPEERS 13
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16
#define TSDB_MSG_TYPE_DNODE_CFG 17
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20
#define TSDB_MSG_TYPE_SDB_SYNC 21
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22
#define TSDB_MSG_TYPE_SDB_FORWARD 23
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24
#define TSDB_MSG_TYPE_CONNECT 31
#define TSDB_MSG_TYPE_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_ALTER_ACCT 35
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_DROP_ACCT 37
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CREATE_USER 39
#define TSDB_MSG_TYPE_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_ALTER_USER 41
#define TSDB_MSG_TYPE_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_DROP_USER 43
#define TSDB_MSG_TYPE_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CREATE_MNODE 45
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46
#define TSDB_MSG_TYPE_DROP_MNODE 47
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48
#define TSDB_MSG_TYPE_CREATE_DNODE 49
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50
#define TSDB_MSG_TYPE_DROP_DNODE 51
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52
#define TSDB_MSG_TYPE_ALTER_DNODE 53
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54
#define TSDB_MSG_TYPE_CREATE_DB 55
#define TSDB_MSG_TYPE_CREATE_DB_RSP 56
#define TSDB_MSG_TYPE_DROP_DB 57
#define TSDB_MSG_TYPE_DROP_DB_RSP 58
#define TSDB_MSG_TYPE_USE_DB 59
#define TSDB_MSG_TYPE_USE_DB_RSP 60
#define TSDB_MSG_TYPE_ALTER_DB 61
#define TSDB_MSG_TYPE_ALTER_DB_RSP 62
#define TSDB_MSG_TYPE_CREATE_TABLE 63
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64
#define TSDB_MSG_TYPE_DROP_TABLE 65
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66
#define TSDB_MSG_TYPE_ALTER_TABLE 67
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68
#define TSDB_MSG_TYPE_VNODE_CFG 69
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70
#define TSDB_MSG_TYPE_TABLE_CFG 71
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72
#define TSDB_MSG_TYPE_TABLE_META 73
#define TSDB_MSG_TYPE_TABLE_META_RSP 74
#define TSDB_MSG_TYPE_STABLE_META 75
#define TSDB_MSG_TYPE_STABLE_META_RSP 76
#define TSDB_MSG_TYPE_MULTI_TABLE_META 77
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78
#define TSDB_MSG_TYPE_ALTER_STREAM 79
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80
#define TSDB_MSG_TYPE_SHOW 81
#define TSDB_MSG_TYPE_SHOW_RSP 82
#define TSDB_MSG_TYPE_CFG_MNODE 83
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84
#define TSDB_MSG_TYPE_KILL_QUERY 85
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86
#define TSDB_MSG_TYPE_KILL_STREAM 87
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88
#define TSDB_MSG_TYPE_KILL_CONNECTION 89
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90
#define TSDB_MSG_TYPE_HEARTBEAT 91
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92
#define TSDB_MSG_TYPE_STATUS 93
#define TSDB_MSG_TYPE_STATUS_RSP 94
#define TSDB_MSG_TYPE_GRANT 95
#define TSDB_MSG_TYPE_GRANT_RSP 96
#define TSDB_MSG_TYPE_MAX 97
// IE type
#define TSDB_IE_TYPE_SEC 1
......
......@@ -42,7 +42,7 @@ typedef struct {
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
void *(*fp)(char type, void *pCont, int contLen, void *handle, int index); // function to process the incoming msg
void *(*fp)(int8_t type, void *pCont, int32_t contLen, void *handle, int32_t index); // function to process the incoming msg
int sessions; // number of sessions allowed
int connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
......
......@@ -565,9 +565,9 @@ void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj
void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) {
int8_t msgType = *(sched->msg - 1);
int8_t msgType = *(int8_t *) (sched->msg - sizeof(int32_t) - sizeof(int8_t));
int32_t contLen = *(int32_t *) (sched->msg - sizeof(int8_t));
int8_t *pCont = sched->msg;
int32_t contLen = (int32_t) sched->ahandle;
void *pConn = NULL;
dnodeProcessMsgFromMgmt(pCont, contLen, msgType, pConn);
......@@ -576,13 +576,13 @@ void mgmtSendMsgToDnodeImpFp(SSchedMsg *sched) {
int32_t mgmtSendMsgToDnodeImp(int8_t *pCont, int32_t contLen, int8_t msgType) {
mTrace("msg:%s is sent to dnode", taosMsg[msgType]);
*(pCont-1) = msgType;
*(int8_t *) (pCont - sizeof(int32_t) - sizeof(int8_t)) = msgType;
*(int32_t *) (pCont - sizeof(int8_t)) = contLen;
SSchedMsg schedMsg = {0};
schedMsg.fp = mgmtSendMsgToDnodeImpFp;
schedMsg.msg = pCont;
SSchedMsg schedMsg;
schedMsg.fp = mgmtSendMsgToDnodeImpFp;
schedMsg.msg = pCont;
schedMsg.ahandle = (void*)contLen;
schedMsg.thandle = NULL;
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
return TSDB_CODE_SUCCESS;
......
......@@ -16,9 +16,19 @@
#ifndef TDENGINE_HTTP_SYSTEM_H
#define TDENGINE_HTTP_SYSTEM_H
int httpInitSystem();
int httpStartSystem();
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
void httpCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
......@@ -13,16 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __MONITOR_SYSTEM_H__
#define __MONITOR_SYSTEM_H__
#ifndef TDENGINE_MONITOR_SYSTEM_H
#define TDENGINE_MONITOR_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdbool.h>
#include <stdint.h>
int monitorInitSystem();
int monitorStartSystem();
int32_t monitorInitSystem();
int32_t monitorStartSystem();
void monitorStopSystem();
void monitorCleanUpSystem();
extern void (*mnodeCountRequestFp)(SDnodeStatisInfo *info);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
......@@ -26,30 +26,8 @@ char *taosMsg[] = {
"create-table",
"create-table-rsp", //10
"create-normal-table",
"create-normal-table-rsp",
"create-stream-table",
"create-stream-table-rsp",
"create-super-table",
"create-super-table-rsp",
"remove-table",
"remove-table-rsp",
"remove-normal-table",
"remove-normal-table-rsp", //20
"remove-stream-table",
"remove-stream-table-rsp",
"remove-super-table",
"remove-super-table-rsp",
"alter-table",
"alter-table-rsp",
"alter-normal-table",
"alter-normal-table-rsp",
"alter-stream-table",
"alter-stream-table-rsp", //30
"alter-super-table",
"alter-super-table-rsp",
"vpeers",
"vpeers-rsp",
"free-vnode",
......@@ -57,7 +35,7 @@ char *taosMsg[] = {
"cfg-dnode",
"cfg-dnode-rsp",
"alter-stream",
"alter-stream-rsp", //40
"alter-stream-rsp", //20
"sync",
"sync-rsp",
......@@ -68,7 +46,7 @@ char *taosMsg[] = {
"",
"",
"",
"", //50
"", //30
"connect",
"connect-rsp",
......@@ -79,7 +57,7 @@ char *taosMsg[] = {
"drop-acct",
"drop-acct-rsp",
"create-user",
"create-user-rsp", //60
"create-user-rsp", //40
"alter-user",
"alter-user-rsp",
......@@ -90,7 +68,7 @@ char *taosMsg[] = {
"drop-mnode",
"drop-mnode-rsp",
"create-dnode",
"create-dnode-rsp", //70
"create-dnode-rsp", //50
"drop-dnode",
"drop-dnode-rsp",
......@@ -101,7 +79,7 @@ char *taosMsg[] = {
"drop-db",
"drop-db-rsp",
"use-db",
"use-db-rsp", //80
"use-db-rsp", //60
"alter-db",
"alter-db-rsp",
......@@ -112,7 +90,7 @@ char *taosMsg[] = {
"alter-table",
"alter-table-rsp",
"cfg-vnode",
"cfg-vnode-rsp", //90
"cfg-vnode-rsp", //70
"cfg-table",
"cfg-table-rsp",
......@@ -123,7 +101,7 @@ char *taosMsg[] = {
"multi-table-meta",
"multi-table-meta-rsp",
"alter-stream",
"alter-stream-rsp", //100
"alter-stream-rsp", //80
"show",
"show-rsp",
......@@ -134,7 +112,7 @@ char *taosMsg[] = {
"kill-stream",
"kill-stream-rsp",
"kill-connection",
"kill-connectoin-rsp", //110
"kill-connectoin-rsp", //90
"heart-beat",
"heart-beat-rsp",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册