提交 c1c9d551 编写于 作者: A antirez

Fix for bug 561 and other related problems

上级 634bae94
...@@ -947,3 +947,28 @@ void clientCommand(redisClient *c) { ...@@ -947,3 +947,28 @@ void clientCommand(redisClient *c) {
addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)"); addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
} }
} }
void rewriteClientCommandVector(redisClient *c, int argc, ...) {
va_list ap;
int j;
robj **argv; /* The new argument vector */
argv = zmalloc(sizeof(robj*)*argc);
va_start(ap,argc);
for (j = 0; j < argc; j++) {
robj *a;
a = va_arg(ap, robj*);
argv[j] = a;
incrRefCount(a);
}
/* We free the objects in the original vector at the end, so we are
* sure that if the same objects are reused in the new vector the
* refcount gets incremented before it gets decremented. */
for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
/* Replace argv and argc with our new versions. */
c->argv = argv;
c->argc = argc;
va_end(ap);
}
...@@ -819,6 +819,7 @@ void addReplyMultiBulkLen(redisClient *c, long length); ...@@ -819,6 +819,7 @@ void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o); void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list, void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer); unsigned long *biggest_input_buffer);
void rewriteClientCommandVector(redisClient *c, int argc, ...);
#ifdef __GNUC__ #ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
......
...@@ -640,7 +640,9 @@ void lremCommand(redisClient *c) { ...@@ -640,7 +640,9 @@ void lremCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz. * as well. This command was originally proposed by Ezra Zygmuntowicz.
*/ */
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
robj *aux;
if (!handleClientsWaitingListPush(c,dstkey,value)) { if (!handleClientsWaitingListPush(c,dstkey,value)) {
/* Create the list if the key does not exist */ /* Create the list if the key does not exist */
if (!dstobj) { if (!dstobj) {
...@@ -648,9 +650,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value ...@@ -648,9 +650,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
dbAdd(c->db,dstkey,dstobj); dbAdd(c->db,dstkey,dstobj);
} else { } else {
signalModifiedKey(c->db,dstkey); signalModifiedKey(c->db,dstkey);
server.dirty++;
} }
listTypePush(dstobj,value,REDIS_HEAD); listTypePush(dstobj,value,REDIS_HEAD);
/* If we are pushing as a result of LPUSH against a key
* watched by BLPOPLPUSH, we need to rewrite the command vector.
* But if this is called directly by RPOPLPUSH (either directly
* or via a BRPOPLPUSH where the popped list exists)
* we should replicate the BRPOPLPUSH command itself. */
if (c != origclient) {
aux = createStringObject("LPUSH",5);
rewriteClientCommandVector(origclient,3,aux,dstkey,value);
decrRefCount(aux);
} else {
/* Make sure to always use RPOPLPUSH in the replication / AOF,
* even if the original command was BRPOPLPUSH. */
aux = createStringObject("RPOPLPUSH",9);
rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
decrRefCount(aux);
}
server.dirty++;
} }
/* Always send the pushed value to the client. */ /* Always send the pushed value to the client. */
...@@ -666,16 +684,22 @@ void rpoplpushCommand(redisClient *c) { ...@@ -666,16 +684,22 @@ void rpoplpushCommand(redisClient *c) {
addReply(c,shared.nullbulk); addReply(c,shared.nullbulk);
} else { } else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]); robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];
if (dobj && checkType(c,dobj,REDIS_LIST)) return; if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL); value = listTypePop(sobj,REDIS_TAIL);
rpoplpushHandlePush(c,c->argv[2],dobj,value); /* We saved touched key, and protect it, since rpoplpushHandlePush
* may change the client command argument vector. */
incrRefCount(touchedkey);
rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */ /* listTypePop returns an object with its refcount incremented */
decrRefCount(value); decrRefCount(value);
/* Delete the source list when it is empty */ /* Delete the source list when it is empty */
if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++; server.dirty++;
} }
} }
...@@ -777,6 +801,7 @@ void unblockClientWaitingData(redisClient *c) { ...@@ -777,6 +801,7 @@ void unblockClientWaitingData(redisClient *c) {
/* Cleanup the client structure */ /* Cleanup the client structure */
zfree(c->bpop.keys); zfree(c->bpop.keys);
c->bpop.keys = NULL; c->bpop.keys = NULL;
if (c->bpop.target) decrRefCount(c->bpop.target);
c->bpop.target = NULL; c->bpop.target = NULL;
c->flags &= ~REDIS_BLOCKED; c->flags &= ~REDIS_BLOCKED;
c->flags |= REDIS_UNBLOCKED; c->flags |= REDIS_UNBLOCKED;
...@@ -820,6 +845,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { ...@@ -820,6 +845,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
receiver = ln->value; receiver = ln->value;
dstkey = receiver->bpop.target; dstkey = receiver->bpop.target;
/* Protect receiver->bpop.target, that will be freed by
* the next unblockClientWaitingData() call. */
if (dstkey) incrRefCount(dstkey);
/* This should remove the first element of the "clients" list. */ /* This should remove the first element of the "clients" list. */
unblockClientWaitingData(receiver); unblockClientWaitingData(receiver);
...@@ -828,17 +857,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { ...@@ -828,17 +857,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyMultiBulkLen(receiver,2); addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key); addReplyBulk(receiver,key);
addReplyBulk(receiver,ele); addReplyBulk(receiver,ele);
return 1; return 1; /* Serve just the first client as in B[RL]POP semantics */
} else { } else {
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
dstobj = lookupKeyWrite(receiver->db,dstkey); dstobj = lookupKeyWrite(receiver->db,dstkey);
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
decrRefCount(dstkey); rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
} else {
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
decrRefCount(dstkey); decrRefCount(dstkey);
return 1; return 1;
} }
decrRefCount(dstkey);
} }
} }
......
...@@ -332,7 +332,7 @@ void scardCommand(redisClient *c) { ...@@ -332,7 +332,7 @@ void scardCommand(redisClient *c) {
} }
void spopCommand(redisClient *c) { void spopCommand(redisClient *c) {
robj *set, *ele; robj *set, *ele, *aux;
int64_t llele; int64_t llele;
int encoding; int encoding;
...@@ -348,16 +348,11 @@ void spopCommand(redisClient *c) { ...@@ -348,16 +348,11 @@ void spopCommand(redisClient *c) {
setTypeRemove(set,ele); setTypeRemove(set,ele);
} }
/* Change argv to replicate as SREM */ /* Replicate/AOF this command as an SREM operation */
c->argc = 3; aux = createStringObject("SREM",4);
c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc)); rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
decrRefCount(ele);
/* Overwrite SREM with SPOP (same length) */ decrRefCount(aux);
redisAssert(sdslen(c->argv[0]->ptr) == 4);
memcpy(c->argv[0]->ptr, "SREM", 4);
/* Popped element already has incremented refcount */
c->argv[2] = ele;
addReplyBulk(c,ele); addReplyBulk(c,ele);
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
......
...@@ -6,6 +6,24 @@ start_server {tags {"repl"}} { ...@@ -6,6 +6,24 @@ start_server {tags {"repl"}} {
s -1 role s -1 role
} {slave} } {slave}
test {BRPOPLPUSH replication, when blocking against empty list} {
set rd [redis_deferring_client]
$rd brpoplpush a b 5
r lpush a foo
after 1000
assert_equal [r debug digest] [r -1 debug digest]
}
test {BRPOPLPUSH replication, list exists} {
set rd [redis_deferring_client]
r lpush c 1
r lpush c 2
r lpush c 3
$rd brpoplpush c d 5
after 1000
assert_equal [r debug digest] [r -1 debug digest]
}
test {MASTER and SLAVE dataset should be identical after complex ops} { test {MASTER and SLAVE dataset should be identical after complex ops} {
createComplexDataset r 10000 createComplexDataset r 10000
after 500 after 500
......
...@@ -278,6 +278,14 @@ start_server { ...@@ -278,6 +278,14 @@ start_server {
r exec r exec
} {foo bar {} {} {bar foo}} } {foo bar {} {} {bar foo}}
test {BRPOPLPUSH timeout} {
set rd [redis_deferring_client]
$rd brpoplpush foo_list bar_list 1
after 2000
$rd read
} {}
foreach {pop} {BLPOP BRPOP} { foreach {pop} {BLPOP BRPOP} {
test "$pop: with single empty list argument" { test "$pop: with single empty list argument" {
set rd [redis_deferring_client] set rd [redis_deferring_client]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册