提交 14c90236 编写于 作者: S Shengliang Guan

TD-1057

上级 241f1732
......@@ -69,7 +69,7 @@ enum EPOLL_EVENTS {
#define EPOLL_CTL_DEL 3
typedef void* HANDLE;
typedef uintptr_t SOCKET;
//typedef uintptr_t SOCKET;
typedef union epoll_data {
void* ptr;
......@@ -92,14 +92,14 @@ extern "C" {
WEPOLL_EXPORT HANDLE epoll_create(int size);
WEPOLL_EXPORT HANDLE epoll_create1(int flags);
WEPOLL_EXPORT int epoll_close(HANDLE ephnd);
WEPOLL_EXPORT int epoll_close(SOCKET ephnd);
WEPOLL_EXPORT int epoll_ctl(HANDLE ephnd,
WEPOLL_EXPORT int epoll_ctl(SOCKET ephnd,
int op,
SOCKET sock,
struct epoll_event* event);
WEPOLL_EXPORT int epoll_wait(HANDLE ephnd,
WEPOLL_EXPORT int epoll_wait(SOCKET ephnd,
struct epoll_event* events,
int maxevents,
int timeout);
......
......@@ -89,14 +89,14 @@ extern "C" {
WEPOLL_EXPORT HANDLE epoll_create(int size);
WEPOLL_EXPORT HANDLE epoll_create1(int flags);
WEPOLL_EXPORT int epoll_close(HANDLE ephnd);
WEPOLL_EXPORT int epoll_close(SOCKET ephnd);
WEPOLL_EXPORT int epoll_ctl(HANDLE ephnd,
WEPOLL_EXPORT int epoll_ctl(SOCKET ephnd,
int op,
SOCKET sock,
struct epoll_event* event);
WEPOLL_EXPORT int epoll_wait(HANDLE ephnd,
WEPOLL_EXPORT int epoll_wait(SOCKET ephnd,
struct epoll_event* events,
int maxevents,
int timeout);
......@@ -593,7 +593,7 @@ HANDLE epoll_create1(int flags) {
return epoll__create();
}
int epoll_close(HANDLE ephnd) {
int epoll_close(SOCKET ephnd) {
ts_tree_node_t* tree_node;
port_state_t* port_state;
......@@ -614,11 +614,11 @@ int epoll_close(HANDLE ephnd) {
return port_delete(port_state);
err:
err_check_handle(ephnd);
err_check_handle((HANDLE)ephnd);
return -1;
}
int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) {
int epoll_ctl(SOCKET ephnd, int op, SOCKET sock, struct epoll_event* ev) {
ts_tree_node_t* tree_node;
port_state_t* port_state;
int r;
......@@ -645,12 +645,12 @@ int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) {
err:
/* On Linux, in the case of epoll_ctl(), EBADF takes priority over other
* errors. Wepoll mimics this behavior. */
err_check_handle(ephnd);
err_check_handle((HANDLE) ephnd);
err_check_handle((HANDLE) sock);
return -1;
}
int epoll_wait(HANDLE ephnd,
int epoll_wait(SOCKET ephnd,
struct epoll_event* events,
int maxevents,
int timeout) {
......@@ -681,7 +681,7 @@ int epoll_wait(HANDLE ephnd,
return num_events;
err:
err_check_handle(ephnd);
err_check_handle((HANDLE) ephnd);
return -1;
}
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/cq/inc)
LIST(APPEND CQTEST_SRC ./cqtest.c)
ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
TARGET_LINK_LIBRARIES(cqtest tcq)
......@@ -2,10 +2,6 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
......
......@@ -32,6 +32,7 @@ extern "C" {
x = FD_INITIALIZER; \
} \
}
typedef int SOCKET;
#endif
#define taosClose(x) taosCloseSocket(x)
......@@ -54,11 +55,11 @@ extern "C" {
#endif
// TAOS_OS_FUNC_SOCKET
int taosSetNonblocking(int sock, int on);
int taosSetNonblocking(SOCKET sock, int on);
void taosBlockSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen);
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen);
#ifdef __cplusplus
}
......
......@@ -85,11 +85,15 @@ extern "C" {
#define TAOS_OS_FUNC_SOCKET
#define TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
#define TAOS_OS_FUNC_SOCKET_OP
#define taosSend(sockfd, buf, len, flags) send(sockfd, buf, len, flags)
#define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) sendto(sockfd, buf, len, flags, dest_addr, addrlen)
#define taosWriteSocket(fd, buf, len) send(fd, buf, len, 0)
#define taosReadSocket(fd, buf, len) recv(fd, buf, len, 0)
#define taosCloseSocket(fd) closesocket(fd)
#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
#define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) sendto((SOCKET)sockfd, buf, len, flags, dest_addr, addrlen)
#define taosWriteSocket(fd, buf, len) send((SOCKET)fd, buf, len, 0)
#define taosReadSocket(fd, buf, len) recv((SOCKET)fd, buf, len, 0)
#define taosCloseSocket(fd) closesocket((SOCKET)fd)
typedef SOCKET eventfd_t;
#define eventfd(a, b) -1
#define TAOS_OS_FUNC_STRING_WCHAR
int twcslen(const wchar_t *wcs);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) {
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
if (level == SOL_SOCKET && optname == SO_SNDBUF) {
return 0;
}
......
......@@ -19,7 +19,7 @@
#ifndef TAOS_OS_FUNC_SOCKET
int taosSetNonblocking(int sock, int on) {
int taosSetNonblocking(SOCKET sock, int on) {
int flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
......@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) {
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
}
......
......@@ -34,7 +34,7 @@ void taosWinSocketInit() {
}
}
int taosSetNonblocking(int sock, int on) {
int taosSetNonblocking(SOCKET sock, int on) {
u_long mode;
if (on) {
mode = 1;
......@@ -48,7 +48,7 @@ int taosSetNonblocking(int sock, int on) {
void taosBlockSIGPIPE() {}
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) {
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0;
}
......
......@@ -21,6 +21,9 @@
#include "rpcLog.h"
#include "rpcHead.h"
#include "rpcTcp.h"
#ifdef WINDOWS
#include "wepoll.h"
#endif
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
......@@ -28,7 +31,7 @@
typedef struct SFdObj {
void *signature;
int fd; // TCP socket FD
SOCKET fd; // TCP socket FD
int closedByApp; // 1: already closed by App
void *thandle; // handle from upper layer, like TAOS
uint32_t ip;
......@@ -44,7 +47,7 @@ typedef struct SThreadObj {
pthread_mutex_t mutex;
uint32_t ip;
bool stop;
int pollFd;
SOCKET pollFd;
int numOfFds;
int threadId;
char label[TSDB_LABEL_LEN];
......@@ -53,7 +56,7 @@ typedef struct SThreadObj {
} SThreadObj;
typedef struct {
int fd;
SOCKET fd;
uint32_t ip;
uint16_t port;
char label[TSDB_LABEL_LEN];
......@@ -64,7 +67,7 @@ typedef struct {
} SServerObj;
static void *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj);
static void *taosAcceptTcpConnection(void *arg);
......@@ -120,7 +123,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
code = -1;
......@@ -163,7 +166,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
eventfd_t fd = -1;
if (pThreadObj->thread && pThreadObj->pollFd >=0) {
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
......@@ -179,9 +182,9 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
}
}
if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL);
if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd);
if (fd != -1) close(fd);
if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL);
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
if (fd != -1) taosCloseSocket(fd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
......@@ -195,7 +198,7 @@ void taosStopTcpServer(void *handle) {
if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL);
tDebug("%s TCP server is stopped", pServerObj->label);
}
......@@ -218,7 +221,7 @@ void taosCleanUpTcpServer(void *handle) {
}
static void *taosAcceptTcpConnection(void *arg) {
int connFd = -1;
SOCKET connFd = -1;
struct sockaddr_in caddr;
int threadId = 0;
SThreadObj *pThreadObj;
......@@ -252,7 +255,7 @@ static void *taosAcceptTcpConnection(void *arg) {
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
inet_ntoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else {
close(connFd);
taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
inet_ntoa(caddr.sin_addr), htons(caddr.sin_port));
}
......@@ -262,7 +265,7 @@ static void *taosAcceptTcpConnection(void *arg) {
threadId = threadId % pServerObj->numOfThreads;
}
close(pServerObj->fd);
taosCloseSocket(pServerObj->fd);
return NULL;
}
......@@ -283,7 +286,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label);
free(pThreadObj);
......@@ -298,7 +301,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr);
if (code != 0) {
close(pThreadObj->pollFd);
taosCloseSocket(pThreadObj->pollFd);
free(pThreadObj);
terrno = TAOS_SYSTEM_ERROR(errno);
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
......@@ -330,7 +333,7 @@ void taosCleanUpTcpClient(void *chandle) {
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SThreadObj * pThreadObj = shandle;
int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (fd < 0) return NULL;
struct sockaddr_in sin;
......@@ -350,7 +353,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else {
close(fd);
taosCloseSocket(fd);
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
}
......@@ -502,7 +505,7 @@ static void *taosProcessTcpData(void *param) {
return NULL;
}
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
struct epoll_event event;
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
......
......@@ -31,7 +31,7 @@
typedef struct {
int index;
int fd;
SOCKET fd;
uint16_t port; // peer port
uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
IF (TD_LINUX_64)
LIST(APPEND CLIENT_SRC ./rclient.c)
ADD_EXECUTABLE(rclient ${CLIENT_SRC})
TARGET_LINK_LIBRARIES(rclient trpc)
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
IF (TD_LINUX_64)
INCLUDE_DIRECTORIES(../inc)
LIST(APPEND CLIENT_SRC ./syncClient.c)
......
......@@ -20,17 +20,17 @@
extern "C" {
#endif
int taosReadn(int sock, char *buffer, int len);
int taosWriteMsg(int fd, void *ptr, int nbytes);
int taosReadMsg(int fd, void *ptr, int nbytes);
int taosNonblockwrite(int fd, char *ptr, int nbytes);
int taosCopyFds(int sfd, int dfd, int64_t len);
int taosSetNonblocking(int sock, int on);
int taosReadn(SOCKET sock, char *buffer, int len);
int taosWriteMsg(SOCKET fd, void *ptr, int nbytes);
int taosReadMsg(SOCKET fd, void *ptr, int nbytes);
int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes);
int taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len);
int taosSetNonblocking(SOCKET sock, int on);
int taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
int taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
int taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int taosKeepTcpAlive(int sockFd);
SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int taosKeepTcpAlive(SOCKET sockFd);
int taosGetFqdn(char *);
uint32_t taosGetIpFromFqdn(const char *);
......
......@@ -90,7 +90,7 @@ uint32_t ip2uint(const char *const ip_addr) {
return *((unsigned int *)ip);
}
int taosWriteMsg(int fd, void *buf, int nbytes) {
int taosWriteMsg(SOCKET fd, void *buf, int nbytes) {
int nleft, nwritten;
char *ptr = (char *)buf;
......@@ -112,7 +112,7 @@ int taosWriteMsg(int fd, void *buf, int nbytes) {
return (nbytes - nleft);
}
int taosReadMsg(int fd, void *buf, int nbytes) {
int taosReadMsg(SOCKET fd, void *buf, int nbytes) {
int nleft, nread;
char *ptr = (char *)buf;
......@@ -139,7 +139,7 @@ int taosReadMsg(int fd, void *buf, int nbytes) {
return (nbytes - nleft);
}
int taosNonblockwrite(int fd, char *ptr, int nbytes) {
int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
taosSetNonblocking(fd, 1);
int nleft, nwritten, nready;
......@@ -152,7 +152,7 @@ int taosNonblockwrite(int fd, char *ptr, int nbytes) {
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select(fd + 1, NULL, &fset, NULL, &tv)) == 0) {
if ((nready = select((int)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
uError("fd %d timeout, no enough space to write", fd);
break;
......@@ -181,7 +181,7 @@ int taosNonblockwrite(int fd, char *ptr, int nbytes) {
return (nbytes - nleft);
}
int taosReadn(int fd, char *ptr, int nbytes) {
int taosReadn(SOCKET fd, char *ptr, int nbytes) {
int nread, nready, nleft = nbytes;
fd_set fset;
......@@ -192,7 +192,7 @@ int taosReadn(int fd, char *ptr, int nbytes) {
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select(fd + 1, NULL, &fset, NULL, &tv)) == 0) {
if ((nready = select((int)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
uError("fd %d timeout\n", fd);
break;
......@@ -219,9 +219,9 @@ int taosReadn(int fd, char *ptr, int nbytes) {
return (nbytes - nleft);
}
int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr;
int sockFd;
SOCKET sockFd;
int bufSize = 1024000;
uDebug("open udp socket:0x%x:%hu", ip, port);
......@@ -238,32 +238,32 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
uError("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
return sockFd;
}
int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
int sockFd = 0;
SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
SOCKET sockFd = 0;
struct sockaddr_in serverAddr, clientAddr;
int ret;
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd < 0) {
uError("failed to open the socket: %d (%s)", errno, strerror(errno));
......@@ -274,11 +274,11 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
int reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
};
if ( clientIp != 0) {
if (clientIp != 0) {
memset((char *)&clientAddr, 0, sizeof(clientAddr));
clientAddr.sin_family = AF_INET;
clientAddr.sin_addr.s_addr = clientIp;
......@@ -288,7 +288,7 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)",
clientIp, destIp, destPort, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
}
......@@ -302,7 +302,7 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
if (ret != 0) {
//uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
sockFd = -1;
} else {
taosKeepTcpAlive(sockFd);
......@@ -311,39 +311,39 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
return sockFd;
}
int taosKeepTcpAlive(int sockFd) {
int taosKeepTcpAlive(SOCKET sockFd) {
int alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
int probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
int alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
int interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
int nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
......@@ -352,16 +352,16 @@ int taosKeepTcpAlive(int sockFd) {
linger.l_linger = 3;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
return 0;
}
int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
int sockFd;
SOCKET sockFd;
int reuse;
uDebug("open tcp server socket:0x%x:%hu", ip, port);
......@@ -380,26 +380,26 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
};
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
uError("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
if (taosKeepTcpAlive(sockFd) < 0) {
uError("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
if (listen(sockFd, 10) < 0) {
uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
close(sockFd);
taosCloseSocket(sockFd);
return -1;
}
......@@ -413,7 +413,7 @@ void tinet_ntoa(char *ipstr, unsigned int ip) {
#define COPY_SIZE 32768
// sendfile shall be used
int taosCopyFds(int sfd, int dfd, int64_t len) {
int taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len) {
int64_t leftLen;
int readLen, writeLen;
char temp[COPY_SIZE];
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
IF (TD_LINUX_64)
INCLUDE_DIRECTORIES(../inc)
LIST(APPEND WALTEST_SRC ./waltest.c)
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
IF (TD_LINUX_64)
add_executable(tdengineTest tdengineTest.c)
target_link_libraries(tdengineTest taos_static tutil common pthread)
ENDIF()
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_WINDOWS_64)
IF (TD_WINDOWS)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
ENDIF ()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册