提交 dbd8c753 编写于 作者: A antirez

REPLCONF internal command introduced.

The REPLCONF command is an internal command (not designed to be directly
used by normal clients) that allows a slave to set some replication
related state in the master before issuing SYNC to start the
replication.

The initial motivation for this command, and the only reason currently
it is used by the implementation, is to let the slave instance
communicate its listening port to the slave, so that the master can
show all the slaves with their listening ports in the "replication"
section of the INFO output.

This allows clients to auto discover and query all the slaves attached
into a master.

Currently only a single option of the REPLCONF command is supported, and
it is called "listening-port", so the slave now starts the replication
process with something like the following chat:

    REPLCONF listening-prot 6380
    SYNC

Note that this works even if the master is an older version of Redis and
does not understand REPLCONF, because the slave ignores the REPLCONF
error.

In the future REPLCONF can be used for partial replication and other
replication related features where there is the need to exchange
information between master and slave.

NOTE: This commit also fixes a bug: the INFO outout already carried
information about slaves, but the port was broken, and was obtained
with getpeername(2), so it was actually just the ephemeral port used
by the slave to connect to the master as a client.
上级 b3f28b90
......@@ -55,6 +55,7 @@ redisClient *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
......
......@@ -215,6 +215,7 @@ struct redisCommand redisCommandTable[] = {
{"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
{"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
......@@ -2075,7 +2076,7 @@ sds genRedisInfoString(char *section) {
}
if (state == NULL) continue;
info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
slaveid,ip,port,state);
slaveid,ip,slave->slave_listening_port,state);
slaveid++;
}
}
......
......@@ -346,6 +346,7 @@ typedef struct redisClient {
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
list *io_keys; /* Keys this client is waiting to be loaded from the
......@@ -1110,6 +1111,7 @@ void scriptCommand(redisClient *c);
void timeCommand(redisClient *c);
void bitopCommand(redisClient *c);
void bitcountCommand(redisClient *c);
void replconfCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
......
......@@ -145,6 +145,46 @@ void syncCommand(redisClient *c) {
return;
}
/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication
* process before starting it with the SYNC command.
*
* Currently the only use of this command is to communicate to the master
* what is the listening port of the Slave redis instance, so that the
* master can accurately list slaves and their listening ports in
* the INFO output.
*
* In the future the same command can be used in order to configure
* the replication to initiate an incremental replication instead of a
* full resync. */
void replconfCommand(redisClient *c) {
int j;
if ((c->argc % 2) == 0) {
/* Number of arguments must be odd to make sure that every
* option has a corresponding value. */
addReply(c,shared.syntaxerr);
return;
}
/* Process every option-value pair. */
for (j = 1; j < c->argc; j+=2) {
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
long port;
if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != REDIS_OK))
return;
c->slave_listening_port = port;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
return;
}
}
addReply(c,shared.ok);
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
......@@ -378,8 +418,54 @@ error:
return;
}
/* Send a synchronous command to the master. Used to send AUTH and
* REPLCONF commadns before starting the replication with SYNC.
*
* On success NULL is returned.
* On error an sds string describing the error is returned.
*/
char *sendSynchronousCommand(int fd, ...) {
va_list ap;
sds cmd = sdsempty();
char *arg, buf[256];
/* Create the command to send to the master, we use simple inline
* protocol for simplicity as currently we only send simple strings. */
va_start(ap,fd);
while(1) {
arg = va_arg(ap, char*);
if (arg == NULL) break;
if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
cmd = sdscat(cmd,arg);
}
cmd = sdscatlen(cmd,"\r\n",2);
/* Transfer command to the server. */
if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
sdsfree(cmd);
return sdscatprintf(sdsempty(),"Writing to master: %s",
strerror(errno));
}
sdsfree(cmd);
/* Read the reply from the server. */
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
{
return sdscatprintf(sdsempty(),"Reading from master: %s",
strerror(errno));
}
/* Check for errors from the server. */
if (buf[0] != '+') {
return sdscatprintf(sdsempty(),"Error from master: %s", buf);
}
return NULL; /* No errors. */
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[1024], tmpfile[256];
char tmpfile[256], *err;
int dfd, maxtries = 5;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
......@@ -400,24 +486,26 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* AUTH with the master if required. */
if(server.masterauth) {
char authcmd[1024];
size_t authlen;
authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
strerror(errno));
goto error;
}
/* Read the AUTH result. */
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
strerror(errno));
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
if (err) {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
if (buf[0] != '+') {
redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
goto error;
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
{
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
sdsfree(port);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err) {
redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
sdsfree(err);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册