From 3b38164e8fe8627ae10536e3cfee5520983a1888 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 3 Oct 2019 11:03:46 +0200 Subject: [PATCH] Modules: RM_Replicate() in thread safe contexts. --- src/module.c | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/module.c b/src/module.c index dcb5c9c98..8126e2749 100644 --- a/src/module.c +++ b/src/module.c @@ -1372,6 +1372,20 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { * * Please refer to RedisModule_Call() for more information. * + * ## Note about calling this function from a thread safe context: + * + * Normally when you call this function from the callback implementing a + * module command, or any other callback provided by the Redis Module API, + * Redis will accumulate all the calls to this function in the context of + * the callback, and will propagate all the commands wrapped in a MULTI/EXEC + * transaction. However when calling this function from a threaded safe context + * that can live an undefined amount of time, and can be locked/unlocked in + * at will, the behavior is different: MULTI/EXEC wrapper is not emitted + * and the command specified is inserted in the AOF and replication stream + * immediately. + * + * ## Return value + * * The command returns REDISMODULE_ERR if the format specifiers are invalid * or the command name does not belong to a known command. */ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) { @@ -1389,10 +1403,18 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) va_end(ap); if (argv == NULL) return REDISMODULE_ERR; - /* Replicate! */ - moduleReplicateMultiIfNeeded(ctx); - alsoPropagate(cmd,ctx->client->db->id,argv,argc, - PROPAGATE_AOF|PROPAGATE_REPL); + /* Replicate! When we are in a threaded context, we want to just insert + * the replicated command ASAP, since it is not clear when the context + * will stop being used, so accumulating stuff does not make much sense, + * nor we could easily use the alsoPropagate() API from threads. */ + if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) { + propagate(cmd,ctx->client->db->id,argv,argc, + PROPAGATE_AOF|PROPAGATE_REPL); + } else { + moduleReplicateMultiIfNeeded(ctx); + alsoPropagate(cmd,ctx->client->db->id,argv,argc, + PROPAGATE_AOF|PROPAGATE_REPL); + } /* Release the argv. */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); -- GitLab