提交 78c1351f 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

failed to open TCP server socket, return error for RPC initialization

上级 f01aa040
......@@ -92,6 +92,7 @@ int32_t main(int32_t argc, char *argv[]) {
// Initialize the system
if (dnodeInitSystem() < 0) {
syslog(LOG_ERR, "Error initialize TDengine system");
dPrint("Failed to start TDengine, please check the log at:%s", tsLogDir);
closelog();
exit(EXIT_FAILURE);
}
......
......@@ -67,7 +67,7 @@ static void *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
static void* taosAcceptTcpConnection(void *arg);
static void *taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj;
......@@ -80,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return NULL;
}
pServerObj->fd = -1;
pServerObj->thread = 0;
pServerObj->ip = ip;
pServerObj->port = port;
......@@ -99,6 +100,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// initialize parameters in case it may encounter error later
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->pollFd = -1;
......@@ -106,18 +108,21 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle;
pThreadObj++;
}
// initialize mutex, thread, fd which may fail
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) {
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1;
break;
}
......@@ -125,7 +130,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
......@@ -133,15 +137,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj++;
}
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->fd < 0) code = -1;
if (code == 0) {
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
}
}
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpTcpServer(pServerObj);
pServerObj = NULL;
} else {
......@@ -204,7 +211,7 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj);
}
static void* taosAcceptTcpConnection(void *arg) {
static void *taosAcceptTcpConnection(void *arg) {
int connFd = -1;
struct sockaddr_in caddr;
int threadId = 0;
......@@ -212,10 +219,6 @@ static void* taosAcceptTcpConnection(void *arg) {
SServerObj *pServerObj;
pServerObj = (SServerObj *)arg;
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->fd < 0) return NULL;
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
while (1) {
......
......@@ -19,6 +19,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "taosdef.h"
#include "taoserror.h"
#include "rpcLog.h"
#include "rpcUdp.h"
#include "rpcHead.h"
......@@ -65,6 +66,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pSet = (SUdpConnSet *)malloc((size_t)size);
if (pSet == NULL) {
tError("%s failed to allocate UdpConn", label);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
......@@ -73,30 +75,34 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pSet->port = port;
pSet->shandle = shandle;
pSet->fp = fp;
pSet->threads = threads;
tstrncpy(pSet->label, label, sizeof(pSet->label));
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int i;
uint16_t ownPort;
for (int i = 0; i < threads; ++i) {
for (i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i;
ownPort = (port ? port + i : 0);
pConn->fd = taosOpenUdpSocket(ip, ownPort);
if (pConn->fd < 0) {
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
taosCleanUpUdpConnection(pSet);
return NULL;
break;
}
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
if (NULL == pConn->buffer) {
tError("%s failed to malloc recv buffer", label);
taosCleanUpUdpConnection(pSet);
return NULL;
break;
}
struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) {
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 &&
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
}
......@@ -107,23 +113,22 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn->pSet = pSet;
pConn->signature = pConn;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
pthread_attr_destroy(&thAttr);
if (code != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet);
return NULL;
tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
break;
}
++pSet->threads;
}
tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads);
pthread_attr_destroy(&thAttr);
if (i != threads) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpUdpConnection(pSet);
return NULL;
}
tTrace("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
return pSet;
}
......@@ -136,16 +141,17 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
pConn->signature = NULL;
// shutdown to signal the thread to exit
shutdown(pConn->fd, SHUT_RD);
if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD);
}
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
pthread_join(pConn->thread, NULL);
free(pConn->buffer);
taosCloseSocket(pConn->fd);
tTrace("chandle:%p is closed", pConn);
if (pConn->thread) pthread_join(pConn->thread, NULL);
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
tfree(pConn->buffer);
tTrace("UDP chandle:%p is closed", pConn);
}
tfree(pSet);
......@@ -159,7 +165,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
SUdpConn *pConn = pSet->udpConn + pSet->index;
pConn->port = port;
tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip, pConn->localPort);
tTrace("%s UDP connection is setup, ip:%x:%hu", pConn->label, ip, port);
return pConn;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册