提交 27f52b89 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

fix #808

上级 e8623789
......@@ -1059,6 +1059,16 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) {
return msgLen;
}
void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn)
{
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
}
void taosProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param;
if (pConn->signature != param) {
......@@ -1074,22 +1084,20 @@ void taosProcessIdleTimer(void *param, void *tmrId) {
return;
}
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex);
tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn);
if (pConn->rspReceived == 0) {
pConn->rspReceived = 1;
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
reportDisc = 1;
}
pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
}
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
......@@ -1114,11 +1122,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
pConn->meterId, pConn);
pConn->rspReceived = 1;
pConn->chandle = NULL;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
taosReportDisconnection(pChann, pConn);
}
tfree(data);
return NULL;
......@@ -1330,6 +1334,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
STaosHeader *pHeader = NULL;
SRpcConn * pConn = (SRpcConn *)param;
int msgLen;
int reportDisc = 0;
if (pConn->signature != param) {
tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
......@@ -1379,13 +1384,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
if (pConn->rspReceived == 0) {
pConn->rspReceived = 1;
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
reportDisc = 1;
}
}
}
......@@ -1397,6 +1396,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
}
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) {
......@@ -1443,22 +1443,19 @@ void taosStopRpcConn(void *thandle) {
tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn);
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex);
if (pConn->outType) {
pConn->rspReceived = 1;
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
reportDisc = 1;
pthread_mutex_unlock(&pChann->mutex);
taosScheduleTask(pChann->qhandle, &schedMsg);
} else {
pthread_mutex_unlock(&pChann->mutex);
taosCloseRpcConn(pConn);
}
if (reportDisc) taosReportDisconnection(pChann, pConn);
}
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册