From 5aa5e3af99092a5ad34a2d24ce4eb91c3f0c5227 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 17 Apr 2020 15:46:43 +0200 Subject: [PATCH] Streams: use alsoPropagate() when in command context. Related to #7105. --- src/blocked.c | 2 +- src/server.c | 16 ++++++++++--- src/stream.h | 2 +- src/t_stream.c | 63 +++++++++++++++++++++++++++++++++----------------- 4 files changed, 57 insertions(+), 26 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 00cc798d5..d1e718158 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -395,7 +395,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { }; streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, - 0, group, consumer, noack, &pi); + 0, group, consumer, noack, &pi, 1); /* Note that after we unblock the client, 'gt' * and other receiver->bpop stuff are no longer diff --git a/src/server.c b/src/server.c index fc9b87aae..39c51b973 100644 --- a/src/server.c +++ b/src/server.c @@ -3088,7 +3088,8 @@ struct redisCommand *lookupCommandOrOriginal(sds name) { * * However for functions that need to (also) propagate out of the context of a * command execution, for example when serving a blocked client, you - * want to use propagate(). + * can use propagate() as alsoPropagate() will just fall back to propagate() + * semantics in this case. */ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) @@ -3114,11 +3115,20 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target) { + if (server.loading) return; /* No propagation during loading. */ + + /* If we aren't in the context of a command execution, we just + * propagate() directly: there is no command execution and we are not + * in the context of call(), we would lost this propagation. */ + printf("%p\n", (void*)server.current_client); + if (server.current_client == NULL) { + propagate(cmd,dbid,argv,argc,target); + return; + } + robj **argvcopy; int j; - if (server.loading) return; /* No propagation during loading. */ - argvcopy = zmalloc(sizeof(robj*)*argc); for (j = 0; j < argc; j++) { argvcopy[j] = argv[j]; diff --git a/src/stream.h b/src/stream.h index b69073994..ffc521a08 100644 --- a/src/stream.h +++ b/src/stream.h @@ -99,7 +99,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, int external); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 155167af9..b4f84ed19 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -42,7 +42,7 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -824,8 +824,10 @@ robj *createObjectFromStreamID(streamID *id) { /* As a result of an explicit XCLAIM or XREADGROUP command, new entries * are created in the pending list of the stream and consumers. We need - * to propagate this changes in the form of XCLAIM commands. */ -void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { + * to propagate this changes in the form of XCLAIM commands. + * 'external' should be true if this command is called outside the + * function implementing a command. */ +void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack, int external) { /* We need to generate an XCLAIM that will work in a idempotent fashion: * * XCLAIM 0 TIME @@ -848,7 +850,12 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam argv[11] = createStringObject("JUSTID",6); argv[12] = createStringObject("LASTID",6); argv[13] = createObjectFromStreamID(&group->last_id); - propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); + if (external) + propagate(server.xclaimCommand,c->db->id,argv,14, + PROPAGATE_AOF|PROPAGATE_REPL); + else + alsoPropagate(server.xclaimCommand,c->db->id,argv,14, + PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -867,15 +874,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam * propagate the last ID just using the XCLAIM LASTID option, so we emit * * XGROUP SETID - */ -void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { + * + * 'external' should be true if this command is called outside the + * function implementing a command. */ +void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname, int external) { robj *argv[5]; argv[0] = createStringObject("XGROUP",6); argv[1] = createStringObject("SETID",5); argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); - alsoPropagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + if (external) + propagate(server.xgroupCommand,c->db->id,argv,5, + PROPAGATE_AOF|PROPAGATE_REPL); + else + alsoPropagate(server.xgroupCommand,c->db->id,argv,5, + PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[4]); @@ -924,12 +938,16 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna * streamReplyWithRange() in order to emit single entries (found in the * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES * flag. + * + * Since this command also handles propagation, the last argument 'external' + * should be set to 1 if the function is called outside the context of + * a function implementing a Redis command. */ #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ #define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, int external) { void *arraylen_ptr = NULL; size_t arraylen = 0; streamIterator si; @@ -943,7 +961,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * as delivered. */ if (group && (flags & STREAM_RWR_HISTORY)) { return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, - consumer); + consumer,external); } if (!(flags & STREAM_RWR_RAWENTRIES)) @@ -1017,12 +1035,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Propagate as XCLAIM. */ if (spi) { robj *idarg = createObjectFromStreamID(&id); - streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); + streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack,external); decrRefCount(idarg); } } else { if (propagate_last_id) - streamPropagateGroupID(c,spi->keyname,group,spi->groupname); + streamPropagateGroupID(c,spi->keyname,group,spi->groupname,external); } arraylen++; @@ -1045,8 +1063,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * This function is more expensive because it needs to inspect the PEL and then * seek into the radix tree of the messages in order to emit the full message * to the client. However clients only reach this code path when they are - * fetching the history of already retrieved messages, which is rare. */ -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { + * fetching the history of already retrieved messages, which is rare. + * + * See streamReplyWithRange() for the meaning of the last argument + * 'external'. This function is only called from streamReplyWithRange(). */ +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer, int external) { raxIterator ri; unsigned char startkey[sizeof(streamID)]; unsigned char endkey[sizeof(streamID)]; @@ -1062,7 +1083,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start streamID thisid; streamDecodeID(ri.key,&thisid); if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL) == 0) + STREAM_RWR_RAWENTRIES,NULL,external) == 0) { /* Note that we may have a not acknowledged entry in the PEL * about a message that's no longer here because was removed @@ -1324,7 +1345,7 @@ void xrangeGenericCommand(client *c, int rev) { addReplyNullArray(c); } else { if (count == -1) count = 0; - streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,0); } } @@ -1563,7 +1584,7 @@ void xreadCommand(client *c) { if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, flags, &spi); + consumer, flags, &spi, 0); if (groups) server.dirty++; } } @@ -2341,19 +2362,19 @@ void xclaimCommand(client *c) { addReplyStreamID(c,&id); } else { size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, - NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); + NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,0); if (!emitted) addReplyNull(c); } arraylen++; /* Propagate this change. */ - streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack,0); propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ server.dirty++; } } if (propagate_last_id) { - streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); + streamPropagateGroupID(c,c->argv[1],group,c->argv[2],0); server.dirty++; } setDeferredArrayLen(c,arraylenptr,arraylen); @@ -2586,11 +2607,11 @@ NULL end.ms = end.seq = UINT64_MAX; addReplyBulkCString(c,"first-entry"); count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,0); if (!count) addReplyNull(c); addReplyBulkCString(c,"last-entry"); count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + STREAM_RWR_RAWENTRIES,NULL,0); if (!count) addReplyNull(c); } else { addReplySubcommandSyntaxError(c); -- GitLab