提交 13732168 编写于 作者: A antirez

Incrementally flush RDB on disk while loading it from a master.

This fixes issue #539.

Basically if there is enough free memory the OS may buffer the RDB file
that the slave transfers on disk from the master. The file may
actually be flused on disk at once by the operating system when it gets
closed by Redis, causing the close system call to block for a long time.

This patch is a modified version of one provided by yoav-steinberg of
@garantiadata (the original version was posted in the issue #539
comments), and tries to flush the OS buffers incrementally (every 8 MB
of loaded data).
上级 06bd3b9a
......@@ -52,6 +52,14 @@
#define aof_fsync fsync
#endif
/* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use
* the plain fsync() call. */
#ifdef __linux__
#define rdb_fsync_range(fd,off,size) sync_file_range(fd,off,size,SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE)
#else
#define rdb_fsync_range(fd,off,size) fsync(fd)
#endif
/* Byte ordering detection */
#include <sys/types.h> /* This will likely define BYTE_ORDER */
......
......@@ -3,6 +3,10 @@
#define _BSD_SOURCE
#if defined(__linux__)
#define _GNU_SOURCE
#endif
#if defined(__linux__) || defined(__OpenBSD__)
#define _XOPEN_SOURCE 700
#else
......
......@@ -2032,9 +2032,10 @@ sds genRedisInfoString(char *section) {
if (server.repl_state == REDIS_REPL_TRANSFER) {
info = sdscatprintf(info,
"master_sync_left_bytes:%ld\r\n"
"master_sync_left_bytes:%lld\r\n"
"master_sync_last_io_seconds_ago:%d\r\n"
,(long)server.repl_transfer_left,
, (long long)
(server.repl_transfer_size - server.repl_transfer_read),
(int)(server.unixtime-server.repl_transfer_lastio)
);
}
......
......@@ -549,7 +549,9 @@ struct redisServer {
redisClient *master; /* Client that is master for this slave */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
off_t repl_transfer_left; /* Bytes left reading .rdb */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
int repl_transfer_s; /* Slave -> Master SYNC socket */
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
......
......@@ -311,16 +311,18 @@ void replicationAbortSyncTransfer(void) {
}
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen;
off_t left;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* If repl_transfer_left == -1 we still have to read the bulk length
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_left == -1) {
if (server.repl_transfer_size == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,
"I/O error reading bulk count from MASTER: %s",
......@@ -343,16 +345,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
goto error;
}
server.repl_transfer_left = strtol(buf+1,NULL,10);
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %ld bytes from master",
server.repl_transfer_left);
server.repl_transfer_size);
return;
}
/* Read bulk data */
readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
server.repl_transfer_left : (signed)sizeof(buf);
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
......@@ -365,9 +367,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
goto error;
}
server.repl_transfer_left -= nread;
server.repl_transfer_read += nread;
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
/* Check if the transfer is now complete */
if (server.repl_transfer_left == 0) {
if (server.repl_transfer_read == server.repl_transfer_size) {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
......@@ -538,7 +554,9 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
server.repl_state = REDIS_REPL_TRANSFER;
server.repl_transfer_left = -1;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册