diff --git a/src/cluster.c b/src/cluster.c index 65d3be8c2bed60ded41bf1d8faa4e6d19696e2a9..2a2c8a386014f5aa55e52cdfd5fc1cf1754934d6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -42,6 +42,8 @@ POSIX_ONLY(#include ) POSIX_ONLY(#include ) #include +WIN32_ONLY(extern int WSIOCP_QueueAccept(int listenfd);) + /* A global reference to myself is handy to make code more clear. * Myself always points to server.cluster->myself, that is, the clusterNode * that represents this node. */ @@ -598,7 +600,10 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* If the server is starting up, don't accept cluster connections: * UPDATE messages may interact with the database content. */ - if (server.masterhost == NULL && server.loading) return; + if (server.masterhost == NULL && server.loading) { + WIN32_ONLY(WSIOCP_QueueAccept(fd);) + return; + } while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); @@ -606,6 +611,12 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (errno != EWOULDBLOCK) redisLog(REDIS_VERBOSE, "Error accepting cluster node: %s", server.neterr); +#ifdef _WIN32 + if (WSIOCP_QueueAccept(fd) == -1) { + redisLog(REDIS_WARNING, + "acceptTcpHandler: failed to queue another accept."); + } +#endif return; } anetNonBlock(NULL,cfd); @@ -1994,6 +2005,42 @@ void handleLinkIOError(clusterLink *link) { freeClusterLink(link); } +#ifdef _WIN32 +void clusterWriteDone(aeEventLoop *el, int fd, void *privdata, int written) { + WSIOCP_Request *req = (WSIOCP_Request *) privdata; + clusterLink *link = (clusterLink *) req->client; + REDIS_NOTUSED(el); + REDIS_NOTUSED(fd); + + if (sdslen(link->sndbuf) == written) { + sdsrange(link->sndbuf, written, -1); + aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); + redisLog(REDIS_WARNING, "clusterWriteDone written %d fd %d", written, link->fd); + } +} + +void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + clusterLink *link = (clusterLink*) privdata; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + + int result = WSIOCP_SocketSend(fd, + (char*) link->sndbuf, + (int) (sdslen(link->sndbuf)), + el, + link, + NULL, + clusterWriteDone); + if (errno == WSA_IO_PENDING) + redisLog(REDIS_WARNING, "WSA_IO_PENDING writing to socket fd %d", link->fd); + + if (result == SOCKET_ERROR && errno != WSA_IO_PENDING) { + redisLog(REDIS_WARNING, "Error writing to socket fd", link->fd); + handleLinkIOError(link); + return; + } +} +#else /* Send data. This is handled using a trivial send buffer that gets * consumed by write(). We don't try to optimize this for speed too much * as this is a very low traffic channel. */ @@ -2010,10 +2057,11 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { handleLinkIOError(link); return; } - sdsrange(link->sndbuf,(int)nwritten,-1); WIN_PORT_FIX /* cast (int) */ + sdsrange(link->sndbuf,nwritten,-1); if (sdslen(link->sndbuf) == 0) aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); } +#endif /* Read data. Try to read the first field of the header first to check the * full length of the packet. When a whole packet is in memory this function @@ -2046,7 +2094,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { "Bad message length or signature received " "from Cluster bus."); handleLinkIOError(link); - IF_WIN32(goto done,return); + return; } } readlen = ntohl(hdr->totlen) - rcvbuflen; @@ -2054,14 +2102,14 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } nread = read(fd,buf,readlen); - if (nread == -1 && errno == EAGAIN) IF_WIN32(goto done, return); /* No more data ready. */ + if (nread == -1 && errno == EAGAIN) { WIN32_ONLY(WSIOCP_QueueNextRead(fd);) return; } /* No more data ready. */ if (nread <= 0) { /* I/O error... */ redisLog(REDIS_DEBUG,"I/O error reading from node link: %s", (nread == 0) ? "connection closed" : strerror(errno)); handleLinkIOError(link); - IF_WIN32(goto done, return); + return; } else { /* Read data and recast the pointer to the new buffer. */ link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread); @@ -2075,11 +2123,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { sdsfree(link->rcvbuf); link->rcvbuf = sdsempty(); } else { - IF_WIN32(goto done, return); /* Link no longer valid. */ + return; /* Link no longer valid. */ } } } -WIN32_ONLY(done:) WIN32_ONLY(WSIOCP_QueueNextRead(fd);) }