提交 5aa5e3af 编写于 作者: A antirez

Streams: use alsoPropagate() when in command context.

Related to #7105.
上级 c479eace
......@@ -395,7 +395,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
};
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
0, group, consumer, noack, &pi, 1);
/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
......
......@@ -3088,7 +3088,8 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
*
* However for functions that need to (also) propagate out of the context of a
* command execution, for example when serving a blocked client, you
* want to use propagate().
* can use propagate() as alsoPropagate() will just fall back to propagate()
* semantics in this case.
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
......@@ -3114,11 +3115,20 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int target)
{
if (server.loading) return; /* No propagation during loading. */
/* If we aren't in the context of a command execution, we just
* propagate() directly: there is no command execution and we are not
* in the context of call(), we would lost this propagation. */
printf("%p\n", (void*)server.current_client);
if (server.current_client == NULL) {
propagate(cmd,dbid,argv,argc,target);
return;
}
robj **argvcopy;
int j;
if (server.loading) return; /* No propagation during loading. */
argvcopy = zmalloc(sizeof(robj*)*argc);
for (j = 0; j < argc; j++) {
argvcopy[j] = argv[j];
......
......@@ -99,7 +99,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, int external);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
......
......@@ -42,7 +42,7 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
......@@ -824,8 +824,10 @@ robj *createObjectFromStreamID(streamID *id) {
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
* to propagate this changes in the form of XCLAIM commands.
* 'external' should be true if this command is called outside the
* function implementing a command. */
void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack, int external) {
/* We need to generate an XCLAIM that will work in a idempotent fashion:
*
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
......@@ -848,7 +850,12 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
argv[11] = createStringObject("JUSTID",6);
argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id);
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
if (external)
propagate(server.xclaimCommand,c->db->id,argv,14,
PROPAGATE_AOF|PROPAGATE_REPL);
else
alsoPropagate(server.xclaimCommand,c->db->id,argv,14,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[3]);
decrRefCount(argv[4]);
......@@ -867,15 +874,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
* XGROUP SETID <key> <groupname> <id>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
*
* 'external' should be true if this command is called outside the
* function implementing a command. */
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname, int external) {
robj *argv[5];
argv[0] = createStringObject("XGROUP",6);
argv[1] = createStringObject("SETID",5);
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
if (external)
propagate(server.xgroupCommand,c->db->id,argv,5,
PROPAGATE_AOF|PROPAGATE_REPL);
else
alsoPropagate(server.xgroupCommand,c->db->id,argv,5,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
......@@ -924,12 +938,16 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
* streamReplyWithRange() in order to emit single entries (found in the
* PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
* flag.
*
* Since this command also handles propagation, the last argument 'external'
* should be set to 1 if the function is called outside the context of
* a function implementing a Redis command.
*/
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
boundaries, just the entries. */
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, int external) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
......@@ -943,7 +961,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* as delivered. */
if (group && (flags & STREAM_RWR_HISTORY)) {
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
consumer);
consumer,external);
}
if (!(flags & STREAM_RWR_RAWENTRIES))
......@@ -1017,12 +1035,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack,external);
decrRefCount(idarg);
}
} else {
if (propagate_last_id)
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
streamPropagateGroupID(c,spi->keyname,group,spi->groupname,external);
}
arraylen++;
......@@ -1045,8 +1063,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* This function is more expensive because it needs to inspect the PEL and then
* seek into the radix tree of the messages in order to emit the full message
* to the client. However clients only reach this code path when they are
* fetching the history of already retrieved messages, which is rare. */
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
* fetching the history of already retrieved messages, which is rare.
*
* See streamReplyWithRange() for the meaning of the last argument
* 'external'. This function is only called from streamReplyWithRange(). */
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external) {
raxIterator ri;
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
......@@ -1062,7 +1083,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL) == 0)
STREAM_RWR_RAWENTRIES,NULL,external) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
* about a message that's no longer here because was removed
......@@ -1324,7 +1345,7 @@ void xrangeGenericCommand(client *c, int rev) {
addReplyNullArray(c);
} else {
if (count == -1) count = 0;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,0);
}
}
......@@ -1563,7 +1584,7 @@ void xreadCommand(client *c) {
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, flags, &spi);
consumer, flags, &spi, 0);
if (groups) server.dirty++;
}
}
......@@ -2341,19 +2362,19 @@ void xclaimCommand(client *c) {
addReplyStreamID(c,&id);
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,0);
if (!emitted) addReplyNull(c);
}
arraylen++;
/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack,0);
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
server.dirty++;
}
}
if (propagate_last_id) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
streamPropagateGroupID(c,c->argv[1],group,c->argv[2],0);
server.dirty++;
}
setDeferredArrayLen(c,arraylenptr,arraylen);
......@@ -2586,11 +2607,11 @@ NULL
end.ms = end.seq = UINT64_MAX;
addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
STREAM_RWR_RAWENTRIES,NULL,0);
if (!count) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
STREAM_RWR_RAWENTRIES,NULL,0);
if (!count) addReplyNull(c);
} else {
addReplySubcommandSyntaxError(c);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册