提交 b93dc1c7 编写于 作者: A Alexis Campailla

Port diskless replication to Windows

During diskless replication the master forks a child, which on posix
simply inherits the socket file descriptors for the connections to
the slaves.
A unix pipe is also used for the child to report the results back
to the master.

The bulk of the porting work is in making sure that the socket
file descriptors and pipe file descriptor are propagated correctly
from the master to its child.
上级 b713fda5
......@@ -392,6 +392,8 @@ static RedisParamterMapper g_redisArgMap =
{ "repl-ping-slave-period", &fp1 }, // repl-ping-slave-period [number]
{ "repl-timeout", &fp1 }, // repl-timeout [number]
{ "repl-disable-tcp-nodelay", &fp1 }, // repl-disable-tcp-nodelay [yes/no]
{ "repl-diskless-sync", &fp1 }, // repl-diskless-sync [yes/no]
{ "repl-diskless-sync-delay", &fp1 }, // repl-diskless-sync-delay [number]
{ "repl-backlog-size", &fp1 }, // repl-backlog-size [number]
{ "repl-backlog-ttl", &fp1 }, // repl-backlog-ttl [number]
{ "slave-priority", &fp1 }, // slave-priority [number]
......
......@@ -43,6 +43,8 @@ redis_WSASend WSASend = NULL;
redis_WSARecv WSARecv = NULL;
redis_WSACleanup WSACleanup = NULL;
redis_WSAGetOverlappedResult WSAGetOverlappedResult = NULL;
redis_WSADuplicateSocket WSADuplicateSocket = NULL;
redis_WSASocket WSASocket = NULL;
// other API forwards
redis_fwrite fdapi_fwrite = NULL;
......@@ -53,8 +55,10 @@ redis_isatty isatty = NULL;
redis_access access = NULL;
redis_lseek64 lseek64 = NULL;
redis_get_osfhandle fdapi_get_osfhandle = NULL;
redis_open_osfhandle fdapi_open_osfhandle = NULL;
// Unix compatible FD based routines
redis_pipe pipe = NULL;
redis_socket socket = NULL;
redis_close fdapi_close = NULL;
redis_open open = NULL;
......@@ -290,6 +294,21 @@ int redis_socket_impl(int af,int type,int protocol) {
return rfd;
}
int redis_pipe_impl(int *pfds) {
int err = -1;
try {
// Not passing _O_NOINHERIT, the underlying handles are inheritable by default
err = crt_pipe(pfds, 8192, _O_BINARY);
if(err == 0) {
pfds[0] = RFDMap::getInstance().addPosixFD(pfds[0]);
pfds[1] = RFDMap::getInstance().addPosixFD(pfds[1]);
}
} CATCH_AND_REPORT()
return err;
}
// In unix a fd is a fd. All are closed with close().
auto f_closesocket = dllfunctor_stdcall<int, SOCKET>("ws2_32.dll", "closesocket");
int redis_close_impl(RFD rfd) {
......@@ -795,6 +814,40 @@ BOOL redis_WSAGetOverlappedResult_impl(int rfd, LPWSAOVERLAPPED lpOverlapped, LP
return SOCKET_ERROR;
}
auto f_WSADuplicateSocket = dllfunctor_stdcall<int, SOCKET, DWORD, LPWSAPROTOCOL_INFO>("ws2_32.dll", "WSADuplicateSocketW");
int redis_WSADuplicateSocket_impl(int rfd, DWORD dwProcessId, LPWSAPROTOCOL_INFO lpProtocolInfo) {
try {
SOCKET s = RFDMap::getInstance().lookupSocket( rfd );
if( s != INVALID_SOCKET ) {
return f_WSADuplicateSocket(s, dwProcessId, lpProtocolInfo);
} else {
errno = EBADF;
return SOCKET_ERROR;
}
} CATCH_AND_REPORT();
return SOCKET_ERROR;
}
auto f_WSASocket = dllfunctor_stdcall<SOCKET, int, int, int, LPWSAPROTOCOL_INFO, GROUP, DWORD>("ws2_32.dll", "WSASocketW");
int redis_WSASocket_impl(int af, int type, int protocol, LPWSAPROTOCOL_INFO lpProtocolInfo, GROUP g, DWORD dwFlags) {
RFD rfd = RFDMap::invalidRFD;
try {
SOCKET socket = f_WSASocket(af,
type,
protocol,
lpProtocolInfo,
g,
dwFlags);
if(socket != INVALID_SOCKET) {
rfd = RFDMap::getInstance().addSocket(socket);
}
} CATCH_AND_REPORT()
return rfd;
}
int redis_WSAIoctl_impl(RFD rfd,DWORD dwIoControlCode,LPVOID lpvInBuffer,DWORD cbInBuffer,LPVOID lpvOutBuffer,DWORD cbOutBuffer,LPDWORD lpcbBytesReturned,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine) {
try {
SOCKET s = RFDMap::getInstance().lookupSocket( rfd );
......@@ -1012,6 +1065,18 @@ intptr_t redis_get_osfhandle_impl(int fd) {
return -1;
}
int redis_open_osfhandle_impl(intptr_t osfhandle, int flags) {
RFD rfd = RFDMap::invalidRFD;
try {
int posixFD = crt_open_osfhandle(osfhandle, flags);
if(posixFD != -1) {
rfd = RFDMap::getInstance().addPosixFD(posixFD);
}
} CATCH_AND_REPORT()
return rfd;
}
auto f_freeaddrinfo = dllfunctor_stdcall<void, addrinfo*>("ws2_32.dll", "freeaddrinfo");
void redis_freeaddrinfo_impl(struct addrinfo *ai) {
f_freeaddrinfo(ai);
......@@ -1089,6 +1154,7 @@ private:
Win32_FDSockMap() {
InitWinsock();
pipe = redis_pipe_impl;
socket = redis_socket_impl;
fdapi_close = redis_close_impl;
open = redis_open_impl;
......@@ -1123,12 +1189,15 @@ private:
WSARecv = redis_WSARecv_impl;
WSACleanup = redis_WSACleanup_impl;
WSAGetOverlappedResult = redis_WSAGetOverlappedResult_impl;
WSADuplicateSocket = redis_WSADuplicateSocket_impl;
WSASocket = redis_WSASocket_impl;
select = redis_select_impl;
ntohl = redis_ntohl_impl;
isatty = redis_isatty_impl;
access = redis_access_impl;
lseek64 = redis_lseek64_impl;
fdapi_get_osfhandle = redis_get_osfhandle_impl;
fdapi_open_osfhandle = redis_open_osfhandle_impl;
freeaddrinfo = redis_freeaddrinfo_impl;
getaddrinfo = redis_getaddrinfo_impl;
inet_ntop = redis_inet_ntop_impl;
......
......@@ -149,11 +149,15 @@ typedef struct hostent* (*redis_gethostbyname)(const char *name);
typedef char* (*redis_inet_ntoa)(struct in_addr in);
typedef BOOL (*redis_WSAGetOverlappedResult)(int rfd,LPWSAOVERLAPPED lpOverlapped, LPDWORD lpcbTransfer, BOOL fWait, LPDWORD lpdwFlags);
typedef int (*redis_WSADuplicateSocket)(int rfd, DWORD dwProcessId, LPWSAPROTOCOL_INFO lpProtocolInfo);
typedef int (*redis_WSASocket)(int af, int type, int protocol, LPWSAPROTOCOL_INFO lpProtocolInfo, GROUP g, DWORD dwFlags);
// other API forwards
typedef int (*redis_setmode)(int fd,int mode);
typedef size_t (*redis_fwrite)(const void * _Str, size_t _Size, size_t _Count, FILE * _File);
// API prototypes must match the unix implementation
typedef int (*redis_pipe)(int pipefd[2]);
typedef int (*redis_socket)(int af,int type,int protocol);
typedef int (*redis_close)(int fd);
typedef int (*redis_open)(const char * _Filename, int _OpenFlag, int flags);
......@@ -186,6 +190,7 @@ typedef int (*redis_isatty)(int fd);
typedef int (*redis_access)(const char *pathname, int mode);
typedef u_int64 (*redis_lseek64)(int fd, u_int64 offset, int whence);
typedef intptr_t (*redis_get_osfhandle)(int fd);
typedef int (*redis_open_osfhandle)(intptr_t osfhandle, int flags);
typedef int(*redis_FD_ISSET)(int fd, fd_set *);
// access() mode definitions
......@@ -199,6 +204,7 @@ extern "C"
#endif
// API replacements
extern redis_pipe pipe;
extern redis_socket socket;
extern redis_WSASend WSASend;
extern redis_WSARecv WSARecv;
......@@ -207,6 +213,8 @@ extern redis_ioctlsocket ioctlsocket;
extern redis_inet_addr inet_addr;
extern redis_inet_ntoa inet_ntoa;
extern redis_WSAGetOverlappedResult WSAGetOverlappedResult;
extern redis_WSADuplicateSocket WSADuplicateSocket;
extern redis_WSASocket WSASocket;
extern redis_close fdapi_close;
extern redis_open open;
......@@ -239,6 +247,7 @@ extern redis_isatty isatty;
extern redis_access access;
extern redis_lseek64 lseek64;
extern redis_get_osfhandle fdapi_get_osfhandle;
extern redis_open_osfhandle fdapi_open_osfhandle;
extern redis_freeaddrinfo freeaddrinfo;
extern redis_getaddrinfo getaddrinfo;
extern redis_inet_ntop inet_ntop;
......
......@@ -310,6 +310,22 @@ BOOL QForkSlaveInit(HANDLE QForkConrolMemoryMapHandle, DWORD ParentProcessID) {
g_SlaveExitCode = do_rdbSave(g_pQForkControl->globalData.filename);
} else if (g_pQForkControl->typeOfOperation == OperationType::otAOF) {
g_SlaveExitCode = do_aofSave(g_pQForkControl->globalData.filename);
} else if (g_pQForkControl->typeOfOperation == OperationType::otSocket) {
LPWSAPROTOCOL_INFO lpProtocolInfo = (LPWSAPROTOCOL_INFO) g_pQForkControl->globalData.protocolInfo;
int pipe_write_fd = fdapi_open_osfhandle((intptr_t)g_pQForkControl->globalData.pipe_write_handle, _O_APPEND);
for (int i = 0; i < g_pQForkControl->globalData.numfds; i++) {
g_pQForkControl->globalData.fds[i] = WSASocket(FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
&lpProtocolInfo[i],
0,
WSA_FLAG_OVERLAPPED);
}
g_SlaveExitCode = do_socketSave(g_pQForkControl->globalData.fds,
g_pQForkControl->globalData.numfds,
g_pQForkControl->globalData.clientids,
pipe_write_fd);
} else {
throw runtime_error("unexpected operation type");
}
......@@ -904,11 +920,22 @@ void CreateChildProcess(PROCESS_INFORMATION *pi, char* logfile, DWORD dwCreation
g_hForkedProcess = pi->hProcess;
}
BOOL BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalData, DWORD* childPID, uint32_t dictHashSeed, char* logfile) {
typedef void (*CHILD_PID_HOOK)(DWORD pid);
BOOL BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalData, DWORD* childPID, uint32_t dictHashSeed, char* logfile, CHILD_PID_HOOK pidHook = NULL) {
PROCESS_INFORMATION pi;
try {
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
CreateChildProcess(&pi, logfile, 0);
pi.hProcess = INVALID_HANDLE_VALUE;
if(pidHook != NULL) {
CreateChildProcess(&pi, logfile, CREATE_SUSPENDED);
pidHook(pi.dwProcessId);
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
ResumeThread(pi.hThread);
} else {
CopyForkOperationData(type, globalData, sizeOfGlobalData, dictHashSeed);
CreateChildProcess(&pi, logfile, 0);
}
*childPID = pi.dwProcessId;
CloseHandle(pi.hThread);
......@@ -932,6 +959,9 @@ BOOL BeginForkOperation(OperationType type, LPVOID globalData, int sizeOfGlobalD
catch(...) {
::redisLog(REDIS_WARNING, "BeginForkOperation: other exception caught.\n");
}
if(pi.hProcess != INVALID_HANDLE_VALUE) {
TerminateProcess(pi.hProcess, 1);
}
return FALSE;
}
......@@ -959,6 +989,43 @@ BOOL BeginForkOperation_Aof(
return BeginForkOperation(otAOF, globalData, sizeOfGlobalData, childPID, dictHashSeed, logfile);
}
void BeginForkOperation_Socket_PidHook(DWORD dwProcessId) {
WSAPROTOCOL_INFO* protocolInfo = (WSAPROTOCOL_INFO*)dlmalloc(sizeof(WSAPROTOCOL_INFO) * g_pQForkControl->globalData.numfds);
g_pQForkControl->globalData.protocolInfo = protocolInfo;
for(int i = 0; i < g_pQForkControl->globalData.numfds; i++) {
WSADuplicateSocket(g_pQForkControl->globalData.fds[i], dwProcessId, &protocolInfo[i]);
}
}
BOOL BeginForkOperation_Socket(
int *fds,
int numfds,
uint64_t *clientids,
int pipe_write_fd,
LPVOID globalData,
int sizeOfGlobalData,
DWORD* childPID,
unsigned __int32 dictHashSeed,
char* logfile)
{
g_pQForkControl->globalData.fds = fds;
g_pQForkControl->globalData.numfds = numfds;
g_pQForkControl->globalData.clientids = clientids;
HANDLE pipe_write_handle = (HANDLE)_get_osfhandle(pipe_write_fd);
// The handle is already inheritable so there is no need to duplicate it
g_pQForkControl->globalData.pipe_write_handle = (pipe_write_handle);
return BeginForkOperation(otSocket,
globalData,
sizeOfGlobalData,
childPID,
dictHashSeed,
logfile,
BeginForkOperation_Socket_PidHook);
}
OperationStatus GetForkOperationStatus() {
if (WaitForSingleObject(g_pQForkControl->operationComplete, 0) == WAIT_OBJECT_0) {
return OperationStatus::osCOMPLETE;
......
......@@ -23,6 +23,7 @@
#pragma once
#include <Windows.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
......@@ -31,7 +32,8 @@ extern "C" {
typedef enum operationType {
otINVALID = 0,
otRDB = 1,
otAOF = 2
otAOF = 2,
otSocket = 3
} OperationType;
typedef enum operationStatus {
......@@ -49,10 +51,15 @@ typedef enum startupStatus {
#define MAX_GLOBAL_DATA 10000
typedef struct QForkBeginInfo {
char filename[MAX_PATH];
BYTE globalData[MAX_GLOBAL_DATA];
size_t globalDataSize;
unsigned __int32 dictHashSeed;
char filename[MAX_PATH];
int *fds;
int numfds;
uint64_t *clientids;
HANDLE pipe_write_handle;
LPVOID protocolInfo;
} QForkStartupInfo;
StartupStatus QForkStartup(int argc, char** argv);
......@@ -75,6 +82,17 @@ BOOL BeginForkOperation_Aof(
unsigned __int32 dictHashSeed,
char* logfile);
BOOL BeginForkOperation_Socket(
int *fds,
int numfds,
uint64_t *clientids,
int pipe_write_fd,
LPVOID globalData,
int sizeOfGlobalData,
DWORD* childPID,
unsigned __int32 dictHashSeed,
char* logfile);
OperationStatus GetForkOperationStatus();
BOOL EndForkOperation(int * pExitCode);
BOOL AbortForkOperation();
......
......@@ -58,3 +58,83 @@ int do_aofSave(char* filename)
return REDIS_OK;
}
// This function is meant to be an exact replica of the fork() child path in rdbSaveToSlavesSockets
int do_socketSave2(int *fds, int numfds, uint64_t *clientids)
{
#ifndef NO_QFORKIMPL
int retval;
rio slave_sockets;
server.rdb_child_pid = GetCurrentProcessId();
rioInitWithFdset(&slave_sockets,fds,numfds);
zfree(fds);
// On Windows we haven't duplicated the listening sockets so we shouldn't close them
#ifndef _WIN32
closeListeningSockets(0);
#endif
redisSetProcTitle("redis-rdb-to-slaves");
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
if (retval == REDIS_OK && rioFlush(&slave_sockets) == 0)
retval = REDIS_ERR;
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
/* If we are returning OK, at least one slave was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
*
* <len> <slave[0].id> <slave[0].error> ...
*
* len, slave IDs, and slave errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries.
*
* The 'id' represents the slave's client ID, so that the master
* can match the report with a specific slave, and 'error' is
* set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds));
uint64_t *len = msg;
uint64_t *ids = len+1;
int j, msglen;
*len = numfds;
for (j = 0; j < numfds; j++) {
*ids++ = clientids[j];
*ids++ = slave_sockets.io.fdset.state[j];
}
/* Write the message to the parent. If we have no good slaves or
* we are unable to transfer the message to the parent, we exit
* with an error so that the parent will abort the replication
* process with all the childre that were waiting. */
msglen = sizeof(uint64_t)*(1+2*numfds);
if (*len == 0 ||
write(server.rdb_pipe_write_result_to_parent,msg,msglen)
!= msglen)
{
retval = REDIS_ERR;
}
}
return retval;
#endif
return REDIS_OK;
}
int do_socketSave(int *fds, int numfds, uint64_t *clientids, int pipe_write_fd)
{
server.rdb_pipe_write_result_to_parent = pipe_write_fd;
return do_socketSave2(fds, numfds, clientids);
}
......@@ -29,7 +29,8 @@ extern "C" {
void SetupGlobals(LPVOID globalData, size_t globalDataSize, unsigned __int32 dictHashKey);
int do_rdbSave(char* filename);
int do_aofSave(char* filename);
int do_socketSave(int *fds, int numfds, uint64_t *clientids, int pipe_write_fd);
#ifdef __cplusplus
}
#endif
\ No newline at end of file
#endif
......@@ -24,6 +24,10 @@
#include <io.h>
#include <stdlib.h>
int crt_pipe(int *pfds, unsigned int psize, int textmode) {
return _pipe(pfds, psize, textmode);
}
int crt_close(int fd) {
return _close(fd);
}
......@@ -40,6 +44,10 @@ int crt_open(const char *filename, int oflag, int pmode) {
return _open(filename, oflag, pmode);
}
int crt_open_osfhandle(intptr_t osfhandle, int flags) {
return _open_osfhandle(osfhandle, flags);
}
intptr_t crtget_osfhandle(int fd) {
return _get_osfhandle(fd);
}
......
......@@ -23,13 +23,16 @@
#include <cstdint>
#include <stdio.h>
int crt_pipe(int *pfds, unsigned int psize, int textmode);
int crt_close(int fd);
int crt_read(int fd, void *buffer, unsigned int count);
int crt_write(int fd, const void *buffer, unsigned int count);
int crt_open(const char *filename, int oflag, int pmode);
int crt_open_osfhandle(intptr_t osfhandle, int flags);
intptr_t crtget_osfhandle(int fd);
int crtsetmode(int fd, int mode);
size_t crtfwrite(const void * _Str, size_t _Size, size_t _Count, FILE * _File);
int crt_isatty(int fd);
int crt_access(const char *pathname, int mode);
__int64 crt_lseek64(int fd, __int64 offset, int origin);
\ No newline at end of file
__int64 crt_lseek64(int fd, __int64 offset, int origin);
......@@ -89,8 +89,6 @@ typedef unsigned __int32 u_int32_t;
#undef usleep
#define usleep(x) (x == 1) ? Sleep(0) : Sleep((int)((x)/1000))
#define pipe(fds) _pipe(fds, 8192, _O_BINARY|_O_NOINHERIT)
/* Processes */
#define waitpid(pid,statusp,options) _cwait (statusp, pid, WAIT_CHILD)
......
......@@ -1476,6 +1476,8 @@ int rdbSaveToSlavesSockets(void) {
/* Create the child process. */
start = ustime();
#ifndef _WIN32
if ((childpid = fork()) == 0) {
/* Child */
int retval;
......@@ -1540,6 +1542,13 @@ int rdbSaveToSlavesSockets(void) {
}
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
#else // #ifndef _WIN32
if (!BeginForkOperation_Socket(fds, numfds, clientids, pipefds[1], &server, sizeof(server), &server.rdb_child_pid, dictGetHashFunctionSeed(), server.logfile)) {
redisLog(REDIS_WARNING,"Can't save in background: fork: %s", strerror(errno));
return REDIS_ERR;
} else {
childpid = server.rdb_child_pid;
#endif
/* Parent */
zfree(clientids); /* Not used by parent. Free ASAP. */
server.stat_fork_time = ustime()-start;
......
......@@ -1118,11 +1118,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (GetForkOperationStatus() == osCOMPLETE || GetForkOperationStatus() == osFAILED) {
int exitCode;
int bySignal;
OperationType type = ((server.rdb_child_pid != -1) ? otRDB : otAOF);
bySignal = (int)(GetForkOperationStatus() == osFAILED);
redisLog(REDIS_WARNING, (bySignal ? "fork operation failed" : "fork operation complete"));
EndForkOperation(&exitCode);
if (type == otRDB) {
if (server.rdb_child_pid != -1) {
backgroundSaveDoneHandler(exitCode, bySignal);
} else {
backgroundRewriteDoneHandler(exitCode, bySignal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册