提交 d43c0ab2 编写于 作者: S slguan

client can send msg to server

上级 5d965446
......@@ -177,47 +177,28 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
}
int tscSendMsgToServer(SSqlObj *pSql) {
uint8_t code = TSDB_CODE_NETWORK_UNAVAIL;
/*
* the total length of message
* rpc header + actual message body + digest
*
* the pSql object may be released automatically during insert procedure, in which the access of
* message body by using "if (pHeader->msgType & 1)" may cause the segment fault.
*
*/
// the memory will be released by taosProcessResponse, so no memory leak here
char *pStart = rpcMallocCont(pSql->cmd.payloadLen);
if (NULL == pStart) {
char *pMsg = rpcMallocCont(pSql->cmd.payloadLen);
if (NULL == pMsg) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
if (pStart) {
/*
* this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable.
*/
uint64_t signature = (uint64_t) pSql->signature;
//if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
if (pSql->cmd.command < TSDB_SQL_MGMT) {
rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
} else {
rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
}
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
tscTrace("%p send msg code:%d sig:%p", pSql, code, signature);
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
if (pSql->cmd.command < TSDB_SQL_MGMT) {
rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
} else {
rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
}
return code;
return TSDB_CODE_SUCCESS;
}
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
tscPrint("response is received, pCont:%p, code:%d", pCont, code);
SSqlObj *pSql = (SSqlObj *)ahandle;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
......@@ -237,50 +218,49 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
return;
}
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID ||
code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE ||
code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION ||
code == TSDB_CODE_TABLE_ID_MISMATCH) {
/*
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
* the virtual node may have not create table till now, so try again by using the new metermeta.
* 2. this requested table may have been removed by other client, so we need to renew the
* metermeta here.
*
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
* removed. So, renew metermeta and try again.
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/
if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL;
rpcFreeCont(pCont);
return;
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
rpcFreeCont(pCont);
return;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, code);
if (pCont == NULL) {
code = TSDB_CODE_NETWORK_UNAVAIL;
} else {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID ||
code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE ||
code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION ||
code == TSDB_CODE_TABLE_ID_MISMATCH) {
/*
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
* the virtual node may have not create table till now, so try again by using the new metermeta.
* 2. this requested table may have been removed by other client, so we need to renew the
* metermeta here.
*
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
* removed. So, renew metermeta and try again.
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
*/
if (pCmd->command == TSDB_SQL_CONNECT) {
code = TSDB_CODE_NETWORK_UNAVAIL;
rpcFreeCont(pCont);
return;
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
rpcFreeCont(pCont);
return;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, code);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t)code; // keep the previous error code
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t) code; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
if (pMeterMetaInfo->pMeterMeta) {
tscSendMsgToServer(pSql);
rpcFreeCont(pCont);
return;
if (pMeterMetaInfo->pMeterMeta) {
tscSendMsgToServer(pSql);
rpcFreeCont(pCont);
return;
}
}
}
}
if (code != TSDB_CODE_SUCCESS){ // for other error set and return to invoker
rpcFreeCont(pCont);
return;
}
pSql->retry = 0;
if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem);
......@@ -2359,27 +2339,24 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); }
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMConnectMsg *pConnect;
char * pMsg, *pStart;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
pMsg = pCmd->payload + tsRpcHeadSize;
pStart = pMsg;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CONNECT;
pCmd->payloadLen = sizeof(SCMConnectMsg);
pConnect = (SCMConnectMsg *)pMsg;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload;
char *db; // ugly code to move the space
db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1;
strcpy(pConnect->db, db);
strcpy(pConnect->clientVersion, version);
pMsg += sizeof(SCMConnectMsg);
pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_CONNECT;
strcpy(pConnect->msgVersion, "");
return TSDB_CODE_SUCCESS;
}
......
......@@ -64,11 +64,17 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(ip);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) {
if (tsMasterIp[0] && strcmp(ip, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
}
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 3;
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
}
......
......@@ -101,7 +101,7 @@ void taos_init_imp() {
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) {
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
}
......@@ -125,13 +125,13 @@ void taos_init_imp() {
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localIp = "0.0.0.0";//tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.afp = tscProcessMsgFromServer;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
rpcInit.connType = TAOS_CONN_CLIENT;
pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode");
......@@ -139,13 +139,21 @@ void taos_init_imp() {
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localIp = "0.0.0.0";//tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.afp = tscProcessMsgFromServer;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 2000;
rpcInit.user = "root";
rpcInit.ckey = "key";
char secret[32] = {0};
taosEncryptPass((uint8_t *)"taosdata", strlen("taosdata"), secret);
rpcInit.secret = secret;
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
......@@ -319,10 +327,10 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
return -1;
}
// if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
// tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
// return -1;
// }
strncpy(tsSocketType, pStr, tListLen(tsSocketType));
cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
......
......@@ -121,10 +121,6 @@ void dnodeStartModulesImp() {
}
}
}
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
void (*dnodeStartModules)() = dnodeStartModulesImp;
......@@ -85,7 +85,7 @@ int32_t dnodeInitShell() {
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessMsgFromShell;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000;
tsDnodeShellServer = rpcOpen(&rpcInit);
......
......@@ -39,7 +39,7 @@ typedef struct {
uint32_t clientIp;
uint16_t clientPort;
uint32_t serverIp;
char *user;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct {
......
......@@ -81,6 +81,10 @@ struct arguments args = {
*/
int main(int argc, char* argv[]) {
/*setlocale(LC_ALL, "en_US.UTF-8"); */
//
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
printf("ok\n");
//
if (!checkVersion()) {
exit(EXIT_FAILURE);
......
......@@ -87,15 +87,17 @@ int32_t mgmtInitShell() {
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0;
if (numOfThreads < 1) numOfThreads = 1;
memset(&rpcInit, 0, sizeof(rpcInit));
//TODO
numOfThreads = 1;
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;;
rpcInit.localPort = tsMgmtShellPort;
rpcInit.label = "MND-shell";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = mgmtProcessMsgFromShell;
rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.afp = mgmtRetriveUserAuthInfo;
......@@ -1237,17 +1239,17 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr
return TSDB_CODE_INVALID_USER;
}
*spi = 1;
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pUser->pass, TSDB_KEY_LEN);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *ahandle) {
static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) {
SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont;
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
rpcGetConnInfo(thandle, &connInfo);
int32_t code;
SUserObj *pUser = mgmtGetUser(connInfo.user);
......@@ -1311,17 +1313,15 @@ static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *ahandle
connect_over:
if (code != TSDB_CODE_SUCCESS) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcSendResponse(ahandle, code, NULL, 0);
rpcSendResponse(thandle, code, NULL, 0);
} else {
mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcSendResponse(ahandle, code, pConnectRsp, sizeof(pConnectRsp));
rpcSendResponse(thandle, code, pConnectRsp, sizeof(SCMConnectRsp));
}
rpcFreeCont(pCont);
return code;
}
/**
* check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one.
*/
......@@ -1354,6 +1354,7 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a
if (sdbGetRunStatus() != SDB_STATUS_SERVING) {
mTrace("shell msg is ignored since SDB is not ready");
rpcSendResponse(ahandle, TSDB_CODE_NOT_READY, NULL, 0);
rpcFreeCont(pCont);
return;
}
......@@ -1366,6 +1367,7 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a
mError("%s from shell is not processed", taosMsg[(int8_t)type]);
}
}
rpcFreeCont(pCont);
}
void mgmtInitProcessShellMsg() {
......
......@@ -216,7 +216,8 @@ void monitorInitDatabaseCb(void *param, TAOS_RES *result, int code) {
if (-code == TSDB_CODE_TABLE_ALREADY_EXIST || -code == TSDB_CODE_DB_ALREADY_EXIST || code >= 0) {
monitorTrace("monitor:%p, sql success, code:%d, %s", monitor->conn, code, monitor->sql);
if (monitor->cmdIndex == MONITOR_CMD_CREATE_TB_LOG) {
taosLogFp = monitorSaveLog;
//TODO
//taosLogFp = monitorSaveLog;
taosLogSqlFp = monitorExecuteSQL;
taosLogAcctFp = monitorSaveAcctLog;
monitorLPrint("dnode:%s is started", tsPrivateIp);
......
......@@ -796,6 +796,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc;
pHead = rpcDecompressRpcMsg(pHead);
......
......@@ -59,6 +59,10 @@ int64_t sdbGetVersion() {
};
int32_t sdbGetRunStatus() {
if (!tsIsCluster) {
return SDB_STATUS_SERVING;
}
if (sdbInited == NULL) {
return SDB_STATUS_OFFLINE;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册