未验证 提交 2e7d7cba 编写于 作者: M Michael Grunder 提交者: GitHub

Resp3 oob push support (#841)

Proper support for RESP3 PUSH messages.

By default, PUSH messages are now intercepted and the reply memory freed.  
This means existing code should work unchanged when connecting to Redis
>= 6.0.0 even if `CLIENT TRACKING` were then enabled.

Additionally, we define two callbacks users can configure if they wish to handle
these messages in a custom way:

void redisPushFn(void *privdata, void *reply);
void redisAsyncPushFn(redisAsyncContext *ac, void *reply);

See #825
上级 1864e76e
...@@ -6,3 +6,4 @@ ...@@ -6,3 +6,4 @@
/*.a /*.a
/*.pc /*.pc
*.dSYM *.dSYM
tags
...@@ -24,6 +24,8 @@ before_script: ...@@ -24,6 +24,8 @@ before_script:
addons: addons:
apt: apt:
sources:
- sourceline: 'ppa:chris-lea/redis-server'
packages: packages:
- libc6-dbg - libc6-dbg
- libc6-dev - libc6-dev
...@@ -35,6 +37,7 @@ addons: ...@@ -35,6 +37,7 @@ addons:
- libssl-dev - libssl-dev
- libssl-dev:i386 - libssl-dev:i386
- valgrind - valgrind
- redis
env: env:
- BITS="32" - BITS="32"
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o
SSL_OBJ=ssl.o SSL_OBJ=ssl.o
EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push
ifeq ($(USE_SSL),1) ifeq ($(USE_SSL),1)
EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl
endif endif
...@@ -161,6 +161,7 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME) ...@@ -161,6 +161,7 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME) hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)
ifndef AE_DIR ifndef AE_DIR
hiredis-example-ae: hiredis-example-ae:
@echo "Please specify AE_DIR (e.g. <redis repository>/src)" @echo "Please specify AE_DIR (e.g. <redis repository>/src)"
...@@ -195,6 +196,9 @@ endif ...@@ -195,6 +196,9 @@ endif
hiredis-example: examples/example.c $(STLIBNAME) hiredis-example: examples/example.c $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
hiredis-example-push: examples/example-push.c $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
examples: $(EXAMPLES) examples: $(EXAMPLES)
TEST_LIBS = $(STLIBNAME) TEST_LIBS = $(STLIBNAME)
......
[![Build Status](https://travis-ci.org/redis/hiredis.png)](https://travis-ci.org/redis/hiredis) [![Build Status](https://travis-ci.org/redis/hiredis.png)](https://travis-ci.org/redis/hiredis)
**This Readme reflects the latest changed in the master branch. See [v0.13.3](https://github.com/redis/hiredis/tree/v0.13.3) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).** **This Readme reflects the latest changed in the master branch. See [v0.14.1](https://github.com/redis/hiredis/tree/v0.14.1) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).**
# HIREDIS # HIREDIS
...@@ -500,12 +500,75 @@ if (redisInitiateSSLWithContext(c, ssl) != REDIS_OK) { ...@@ -500,12 +500,75 @@ if (redisInitiateSSLWithContext(c, ssl) != REDIS_OK) {
} }
``` ```
## RESP3 PUSH replies
Redis 6.0 introduced PUSH replies with the reply-type `>`. These messages are generated spontaneously and can arrive at any time, so must be handled using callbacks.
### Default behavior
Hiredis installs handlers on `redisContext` and `redisAsyncContext` by default, which will intercept and free any PUSH replies detected. This means existing code will work as-is after upgrading to Redis 6 and switching to `RESP3`.
### Custom PUSH handler prototypes
The callback prototypes differ between `redisContext` and `redisAsyncContext`.
#### redisContext
```c
void my_push_handler(void *privdata, void *reply) {
/* Handle the reply */
/* Note: We need to free the reply in our custom handler for
blocking contexts. This lets us keep the reply if
we want. */
freeReplyObject(reply);
}
```
#### redisAsyncContext
```c
void my_async_push_handler(redisAsyncContext *ac, void *reply) {
/* Handle the reply */
/* Note: Because async hiredis always frees replies, you should
not call freeReplyObject in an async push callback. */
}
```
### Installing a custom handler
There are two ways to set your own PUSH handlers.
1. Set `push_cb` or `async_push_cb` in the `redisOptions` struct and connect with `redisConnectWithOptions` or `redisAsyncConnectWithOptions`.
```c
redisOptions = {0};
REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
options->push_cb = my_push_handler;
redisContext *context = redisConnectWithOptions(&options);
```
2. Call `redisSetPushCallback` or `redisAsyncSetPushCallback` on a connected context.
```c
redisContext *context = redisConnect("127.0.0.1", 6379);
redisSetPushCallback(context, my_push_handler);
```
_Note `redisSetPushCallback` and `redisAsyncSetPushCallback` both return any currently configured handler, making it easy to override and then return to the old value._
### Specifying no handler
If you have a unique use-case where you don't want hiredis to automatically intercept and free PUSH replies, you will want to configure no handler at all. This can be done in two ways.
1. Set the `REDIS_OPT_NO_PUSH_AUTOFREE` flag in `redisOptions` and leave the callback function pointer `NULL`.
```c
redisOptions = {0};
REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
options->options |= REDIS_OPT_NO_PUSH_AUTOFREE;
redisContext *context = redisConnectWithOptions(&options);
```
3. Call `redisSetPushCallback` with `NULL` once connected.
```c
redisContext *context = redisConnect("127.0.0.1", 6379);
redisSetPushCallback(context, NULL);
```
_Note: With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking`redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._
## Allocator injection ## Allocator injection
Hiredis uses a pass-thru structure of function pointers defined in Hiredis uses a pass-thru structure of function pointers defined in [alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that contain the currently configured allocation and deallocation functions. By default they just point to libc (`malloc`, `calloc`, `realloc`, etc).
[alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that conttain
the currently configured allocation and deallocation functions. By default they
just point to libc (`malloc`, `calloc`, `realloc`, etc).
### Overriding ### Overriding
...@@ -532,5 +595,6 @@ hiredisResetAllocators(); ...@@ -532,5 +595,6 @@ hiredisResetAllocators();
## AUTHORS ## AUTHORS
Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and Hiredis was written by Salvatore Sanfilippo (antirez at gmail),
Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license. Pieter Noordhuis (pcnoordhuis at gmail), and Michael Grunder
(michael dot grunder at gmail) and is released under the BSD license.
...@@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) { ...@@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
redisContext *c; redisContext *c;
redisAsyncContext *ac; redisAsyncContext *ac;
/* Clear any erroneously set sync callback and flag that we don't want to
* use freeReplyObject by default. */
myOptions.push_cb = NULL;
myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
myOptions.options |= REDIS_OPT_NONBLOCK; myOptions.options |= REDIS_OPT_NONBLOCK;
c = redisConnectWithOptions(&myOptions); c = redisConnectWithOptions(&myOptions);
if (c == NULL) { if (c == NULL) {
return NULL; return NULL;
} }
ac = redisAsyncInitialize(c); ac = redisAsyncInitialize(c);
if (ac == NULL) { if (ac == NULL) {
redisFree(c); redisFree(c);
return NULL; return NULL;
} }
/* Set any configured async push handler */
redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
__redisAsyncCopyError(ac); __redisAsyncCopyError(ac);
return ac; return ac;
} }
...@@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe ...@@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
} }
} }
static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
if (ac->push_cb != NULL) {
ac->c.flags |= REDIS_IN_CALLBACK;
ac->push_cb(ac, reply);
ac->c.flags &= ~REDIS_IN_CALLBACK;
}
}
/* Helper function to free the context. */ /* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) { static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
...@@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { ...@@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL); __redisRunCallback(ac,&cb,NULL);
/* Run subscription callbacks callbacks with NULL reply */ /* Run subscription callbacks with NULL reply */
if (ac->sub.channels) { if (ac->sub.channels) {
it = dictGetIterator(ac->sub.channels); it = dictGetIterator(ac->sub.channels);
if (it != NULL) { if (it != NULL) {
...@@ -459,6 +477,30 @@ oom: ...@@ -459,6 +477,30 @@ oom:
return REDIS_ERR; return REDIS_ERR;
} }
#define redisIsSpontaneousPushReply(r) \
(redisIsPushReply(r) && !redisIsSubscribeReply(r))
static int redisIsSubscribeReply(redisReply *reply) {
char *str;
size_t len, off;
/* We will always have at least one string with the subscribe/message type */
if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
reply->element[0]->len < sizeof("message") - 1)
{
return 0;
}
/* Get the string/len moving past 'p' if needed */
off = tolower(reply->element[0]->str[0]) == 'p';
str = reply->element[0]->str + off;
len = reply->element[0]->len - off;
return !strncasecmp(str, "subscribe", len) ||
!strncasecmp(str, "message", len);
}
void redisProcessCallbacks(redisAsyncContext *ac) { void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL}; redisCallback cb = {NULL, NULL, 0, NULL};
...@@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) { ...@@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break; break;
} }
/* Even if the context is subscribed, pending regular callbacks will /* Send any non-subscribe related PUSH messages to our PUSH handler
* get a reply before pub/sub messages arrive. */ * while allowing subscribe related PUSH messages to pass through.
* This allows existing code to be backward compatible and work in
* either RESP2 or RESP3 mode. */
if (redisIsSpontaneousPushReply(reply)) {
__redisRunPushCallback(ac, reply);
c->reader->fn->freeObject(reply);
continue;
}
/* Even if the context is subscribed, pending regular
* callbacks will get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/* /*
* A spontaneous reply in a not-subscribed context can be the error * A spontaneous reply in a not-subscribed context can be the error
...@@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void ...@@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
return status; return status;
} }
redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
redisAsyncPushFn *old = ac->push_cb;
ac->push_cb = fn;
return old;
}
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
if (!ac->c.timeout) { if (!ac->c.timeout) {
ac->c.timeout = hi_calloc(1, sizeof(tv)); ac->c.timeout = hi_calloc(1, sizeof(tv));
......
...@@ -106,6 +106,9 @@ typedef struct redisAsyncContext { ...@@ -106,6 +106,9 @@ typedef struct redisAsyncContext {
struct dict *channels; struct dict *channels;
struct dict *patterns; struct dict *patterns;
} sub; } sub;
/* Any configured RESP3 PUSH handler */
redisAsyncPushFn *push_cb;
} redisAsyncContext; } redisAsyncContext;
/* Functions that proxy to hiredis */ /* Functions that proxy to hiredis */
...@@ -118,6 +121,7 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path); ...@@ -118,6 +121,7 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path);
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn);
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv); int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv);
void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncDisconnect(redisAsyncContext *ac);
void redisAsyncFree(redisAsyncContext *ac); void redisAsyncFree(redisAsyncContext *ac);
......
...@@ -44,3 +44,6 @@ ENDIF() ...@@ -44,3 +44,6 @@ ENDIF()
ADD_EXECUTABLE(example example.c) ADD_EXECUTABLE(example example.c)
TARGET_LINK_LIBRARIES(example hiredis) TARGET_LINK_LIBRARIES(example hiredis)
ADD_EXECUTABLE(example-push example-push.c)
TARGET_LINK_LIBRARIES(example-push hiredis)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <hiredis.h>
#include <win32.h>
#define KEY_COUNT 5
#define panicAbort(fmt, ...) \
do { \
fprintf(stderr, "%s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, __VA_ARGS__); \
exit(-1); \
} while (0)
static void assertReplyAndFree(redisContext *context, redisReply *reply, int type) {
if (reply == NULL)
panicAbort("NULL reply from server (error: %s)", context->errstr);
if (reply->type != type) {
if (reply->type == REDIS_REPLY_ERROR)
fprintf(stderr, "Redis Error: %s\n", reply->str);
panicAbort("Expected reply type %d but got type %d", type, reply->type);
}
freeReplyObject(reply);
}
/* Switch to the RESP3 protocol and enable client tracking */
static void enableClientTracking(redisContext *c) {
redisReply *reply = redisCommand(c, "HELLO 3");
if (reply == NULL || c->err) {
panicAbort("NULL reply or server error (error: %s)", c->errstr);
}
if (reply->type != REDIS_REPLY_MAP) {
fprintf(stderr, "Error: Can't send HELLO 3 command. Are you sure you're ");
fprintf(stderr, "connected to redis-server >= 6.0.0?\nRedis error: %s\n",
reply->type == REDIS_REPLY_ERROR ? reply->str : "(unknown)");
exit(-1);
}
freeReplyObject(reply);
/* Enable client tracking */
reply = redisCommand(c, "CLIENT TRACKING ON");
assertReplyAndFree(c, reply, REDIS_REPLY_STATUS);
}
void pushReplyHandler(void *privdata, void *r) {
redisReply *reply = r;
int *invalidations = privdata;
/* Sanity check on the invalidation reply */
if (reply->type != REDIS_REPLY_PUSH || reply->elements != 2 ||
reply->element[1]->type != REDIS_REPLY_ARRAY ||
reply->element[1]->element[0]->type != REDIS_REPLY_STRING)
{
panicAbort("%s", "Can't parse PUSH message!");
}
/* Increment our invalidation count */
*invalidations += 1;
printf("pushReplyHandler(): INVALIDATE '%s' (invalidation count: %d)\n",
reply->element[1]->element[0]->str, *invalidations);
freeReplyObject(reply);
}
int main(int argc, char **argv) {
unsigned int j, invalidations = 0;
redisContext *c;
redisReply *reply;
const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";
int port = (argc > 2) ? atoi(argv[2]) : 6379;
redisOptions o = {0};
REDIS_OPTIONS_SET_TCP(&o, hostname, port);
/* Set our custom PUSH message handler */
o.push_cb = pushReplyHandler;
c = redisConnectWithOptions(&o);
if (c == NULL || c->err)
panicAbort("Connection error: %s", c ? c->errstr : "OOM");
/* Enable RESP3 and turn on client tracking */
enableClientTracking(c);
/* Set our context privdata to the address of our invalidation counter. Each
* time our PUSH handler is called, hiredis will pass the privdata for context */
c->privdata = &invalidations;
/* Set some keys and then read them back. Once we do that, Redis will deliver
* invalidation push messages whenever the key is modified */
for (j = 0; j < KEY_COUNT; j++) {
reply = redisCommand(c, "SET key:%d initial:%d", j, j);
assertReplyAndFree(c, reply, REDIS_REPLY_STATUS);
reply = redisCommand(c, "GET key:%d", j);
assertReplyAndFree(c, reply, REDIS_REPLY_STRING);
}
/* Trigger invalidation messages by updating keys we just read */
for (j = 0; j < KEY_COUNT; j++) {
printf(" main(): SET key:%d update:%d\n", j, j);
reply = redisCommand(c, "SET key:%d update:%d", j, j);
assertReplyAndFree(c, reply, REDIS_REPLY_STATUS);
printf(" main(): SET REPLY OK\n");
}
printf("\nTotal detected invalidations: %d, expected: %d\n", invalidations, KEY_COUNT);
/* PING server */
redisFree(c);
}
...@@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) { ...@@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) {
parent = task->parent->obj; parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY || assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP || parent->type == REDIS_REPLY_MAP ||
parent->type == REDIS_REPLY_SET); parent->type == REDIS_REPLY_SET ||
parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r; parent->element[task->idx] = r;
} }
return r; return r;
...@@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) { ...@@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) {
return redisReaderCreateWithFunctions(&defaultFunctions); return redisReaderCreateWithFunctions(&defaultFunctions);
} }
static void redisPushAutoFree(void *privdata, void *reply) {
(void)privdata;
freeReplyObject(reply);
}
static redisContext *redisContextInit(const redisOptions *options) { static redisContext *redisContextInit(const redisOptions *options) {
redisContext *c; redisContext *c;
...@@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) { ...@@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) {
return NULL; return NULL;
c->funcs = &redisContextDefaultFuncs; c->funcs = &redisContextDefaultFuncs;
/* Set any user supplied RESP3 PUSH handler or use freeReplyObject
* as a default unless specifically flagged that we don't want one. */
if (options->push_cb != NULL)
redisSetPushCallback(c, options->push_cb);
else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE))
redisSetPushCallback(c, redisPushAutoFree);
c->obuf = sdsempty(); c->obuf = sdsempty();
c->reader = redisReaderCreate(); c->reader = redisReaderCreate();
c->fd = REDIS_INVALID_FD; c->fd = REDIS_INVALID_FD;
...@@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { ...@@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) {
c->flags |= REDIS_REUSEADDR; c->flags |= REDIS_REUSEADDR;
} }
if (options->options & REDIS_OPT_NOAUTOFREE) { if (options->options & REDIS_OPT_NOAUTOFREE) {
c->flags |= REDIS_NO_AUTO_FREE; c->flags |= REDIS_NO_AUTO_FREE;
} }
if (options->type == REDIS_CONN_TCP) { if (options->type == REDIS_CONN_TCP) {
...@@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) { ...@@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) {
return REDIS_OK; return REDIS_OK;
} }
/* Set a user provided RESP3 PUSH handler and return any old one set. */
redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) {
redisPushFn *old = c->push_cb;
c->push_cb = fn;
return old;
}
/* Use this function to handle a read event on the descriptor. It will try /* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser. * and read some bytes from the socket and feed them to the reply parser.
* *
...@@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) { ...@@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) {
__redisSetError(c,c->reader->err,c->reader->errstr); __redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR; return REDIS_ERR;
} }
return REDIS_OK; return REDIS_OK;
} }
/* Internal helper that returns 1 if the reply was a RESP3 PUSH
* message and we handled it with a user-provided callback. */
static int redisHandledPushReply(redisContext *c, void *reply) {
if (reply && c->push_cb && redisIsPushReply(reply)) {
c->push_cb(c->privdata, reply);
return 1;
}
return 0;
}
int redisGetReply(redisContext *c, void **reply) { int redisGetReply(redisContext *c, void **reply) {
int wdone = 0; int wdone = 0;
void *aux = NULL; void *aux = NULL;
...@@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) { ...@@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) {
do { do {
if (redisBufferRead(c) == REDIS_ERR) if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR; return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR; /* We loop here in case the user has specified a RESP3
* PUSH handler (e.g. for client tracking). */
do {
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (redisHandledPushReply(c, aux));
} while (aux == NULL); } while (aux == NULL);
} }
......
...@@ -92,6 +92,15 @@ typedef long long ssize_t; ...@@ -92,6 +92,15 @@ typedef long long ssize_t;
* SO_REUSEADDR is being used. */ * SO_REUSEADDR is being used. */
#define REDIS_CONNECT_RETRIES 10 #define REDIS_CONNECT_RETRIES 10
/* Forward declarations for structs defined elsewhere */
struct redisAsyncContext;
struct redisContext;
/* RESP3 push helpers and callback prototypes */
#define redisIsPushReply(r) (((redisReply*)(r))->type == REDIS_REPLY_PUSH)
typedef void (redisPushFn)(void *, void *);
typedef void (redisAsyncPushFn)(struct redisAsyncContext *, void *);
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -140,6 +149,9 @@ struct redisSsl; ...@@ -140,6 +149,9 @@ struct redisSsl;
*/ */
#define REDIS_OPT_NOAUTOFREE 0x04 #define REDIS_OPT_NOAUTOFREE 0x04
/* Don't automatically intercept and free RESP3 PUSH replies. */
#define REDIS_OPT_NO_PUSH_AUTOFREE 0x08
/* In Unix systems a file descriptor is a regular signed int, with -1 /* In Unix systems a file descriptor is a regular signed int, with -1
* representing an invalid descriptor. In Windows it is a SOCKET * representing an invalid descriptor. In Windows it is a SOCKET
* (32- or 64-bit unsigned integer depending on the architecture), where * (32- or 64-bit unsigned integer depending on the architecture), where
...@@ -180,6 +192,10 @@ typedef struct { ...@@ -180,6 +192,10 @@ typedef struct {
* file descriptor */ * file descriptor */
redisFD fd; redisFD fd;
} endpoint; } endpoint;
/* A user defined PUSH message callback */
redisPushFn *push_cb;
redisAsyncPushFn *async_push_cb;
} redisOptions; } redisOptions;
/** /**
...@@ -194,9 +210,6 @@ typedef struct { ...@@ -194,9 +210,6 @@ typedef struct {
(opts)->type = REDIS_CONN_UNIX; \ (opts)->type = REDIS_CONN_UNIX; \
(opts)->endpoint.unix_socket = path; (opts)->endpoint.unix_socket = path;
struct redisAsyncContext;
struct redisContext;
typedef struct redisContextFuncs { typedef struct redisContextFuncs {
void (*free_privdata)(void *); void (*free_privdata)(void *);
void (*async_read)(struct redisAsyncContext *); void (*async_read)(struct redisAsyncContext *);
...@@ -235,6 +248,9 @@ typedef struct redisContext { ...@@ -235,6 +248,9 @@ typedef struct redisContext {
/* Additional private data for hiredis addons such as SSL */ /* Additional private data for hiredis addons such as SSL */
void *privdata; void *privdata;
/* An optional RESP3 PUSH handler */
redisPushFn *push_cb;
} redisContext; } redisContext;
redisContext *redisConnectWithOptions(const redisOptions *options); redisContext *redisConnectWithOptions(const redisOptions *options);
...@@ -261,6 +277,7 @@ redisContext *redisConnectFd(redisFD fd); ...@@ -261,6 +277,7 @@ redisContext *redisConnectFd(redisFD fd);
*/ */
int redisReconnect(redisContext *c); int redisReconnect(redisContext *c);
redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn);
int redisSetTimeout(redisContext *c, const struct timeval tv); int redisSetTimeout(redisContext *c, const struct timeval tv);
int redisEnableKeepAlive(redisContext *c); int redisEnableKeepAlive(redisContext *c);
void redisFree(redisContext *c); void redisFree(redisContext *c);
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include <limits.h> #include <limits.h>
#include "hiredis.h" #include "hiredis.h"
#include "async.h"
#ifdef HIREDIS_TEST_SSL #ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h" #include "hiredis_ssl.h"
#endif #endif
...@@ -77,6 +78,43 @@ static long long usec(void) { ...@@ -77,6 +78,43 @@ static long long usec(void) {
#define assert(e) (void)(e) #define assert(e) (void)(e)
#endif #endif
/* Helper to extract Redis version information. Aborts on any failure. */
#define REDIS_VERSION_FIELD "redis_version:"
void get_redis_version(redisContext *c, int *majorptr, int *minorptr) {
redisReply *reply;
char *eptr, *s, *e;
int major, minor;
reply = redisCommand(c, "INFO");
if (reply == NULL || c->err || reply->type != REDIS_REPLY_STRING)
goto abort;
if ((s = strstr(reply->str, REDIS_VERSION_FIELD)) == NULL)
goto abort;
s += strlen(REDIS_VERSION_FIELD);
/* We need a field terminator and at least 'x.y.z' (5) bytes of data */
if ((e = strstr(s, "\r\n")) == NULL || (e - s) < 5)
goto abort;
/* Extract version info */
major = strtol(s, &eptr, 10);
if (*eptr != '.') goto abort;
minor = strtol(eptr+1, NULL, 10);
/* Push info the caller wants */
if (majorptr) *majorptr = major;
if (minorptr) *minorptr = minor;
freeReplyObject(reply);
return;
abort:
freeReplyObject(reply);
fprintf(stderr, "Error: Cannot determine Redis version, aborting\n");
exit(1);
}
static redisContext *select_database(redisContext *c) { static redisContext *select_database(redisContext *c) {
redisReply *reply; redisReply *reply;
...@@ -99,6 +137,26 @@ static redisContext *select_database(redisContext *c) { ...@@ -99,6 +137,26 @@ static redisContext *select_database(redisContext *c) {
return c; return c;
} }
/* Switch protocol */
static void send_hello(redisContext *c, int version) {
redisReply *reply;
int expected;
reply = redisCommand(c, "HELLO %d", version);
expected = version == 3 ? REDIS_REPLY_MAP : REDIS_REPLY_ARRAY;
assert(reply != NULL && reply->type == expected);
freeReplyObject(reply);
}
/* Togggle client tracking */
static void send_client_tracking(redisContext *c, const char *str) {
redisReply *reply;
reply = redisCommand(c, "CLIENT TRACKING %s", str);
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
}
static int disconnect(redisContext *c, int keep_fd) { static int disconnect(redisContext *c, int keep_fd) {
redisReply *reply; redisReply *reply;
...@@ -615,9 +673,123 @@ static void test_blocking_connection_errors(void) { ...@@ -615,9 +673,123 @@ static void test_blocking_connection_errors(void) {
#endif #endif
} }
/* Dummy push handler */
void push_handler(void *privdata, void *reply) {
int *counter = privdata;
freeReplyObject(reply);
*counter += 1;
}
/* Dummy function just to test setting a callback with redisOptions */
void push_handler_async(redisAsyncContext *ac, void *reply) {
(void)ac;
(void)reply;
}
static void test_resp3_push_handler(redisContext *c) {
redisPushFn *old = NULL;
redisReply *reply;
void *privdata;
int n = 0;
/* Switch to RESP3 and turn on client tracking */
send_hello(c, 3);
send_client_tracking(c, "ON");
privdata = c->privdata;
c->privdata = &n;
reply = redisCommand(c, "GET key:0");
assert(reply != NULL);
freeReplyObject(reply);
test("RESP3 PUSH messages are handled out of band by default: ");
reply = redisCommand(c, "SET key:0 val:0");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
assert((reply = redisCommand(c, "GET key:0")) != NULL);
freeReplyObject(reply);
old = redisSetPushCallback(c, push_handler);
test("We can set a custom RESP3 PUSH handler: ");
reply = redisCommand(c, "SET key:0 val:0");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && n == 1);
freeReplyObject(reply);
/* Unset the push callback and generate an invalidate message making
* sure it is not handled out of band. */
test("With no handler, PUSH replies come in-band: ");
redisSetPushCallback(c, NULL);
assert((reply = redisCommand(c, "GET key:0")) != NULL);
freeReplyObject(reply);
assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL);
test_cond(reply->type == REDIS_REPLY_PUSH);
freeReplyObject(reply);
test("With no PUSH handler, no replies are lost: ");
assert(redisGetReply(c, (void**)&reply) == REDIS_OK);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
/* Return to the originally set PUSH handler */
assert(old != NULL);
redisSetPushCallback(c, old);
/* Switch back to RESP2 and disable tracking */
c->privdata = privdata;
send_client_tracking(c, "OFF");
send_hello(c, 2);
}
redisOptions get_redis_tcp_options(struct config config) {
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
return options;
}
static void test_resp3_push_options(struct config config) {
redisAsyncContext *ac;
redisContext *c;
redisOptions options;
test("We set a default RESP3 handler for redisContext: ");
options = get_redis_tcp_options(config);
assert((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->push_cb != NULL);
redisFree(c);
test("We don't set a default RESP3 push handler for redisAsyncContext: ");
options = get_redis_tcp_options(config);
assert((ac = redisAsyncConnectWithOptions(&options)) != NULL);
test_cond(ac->c.push_cb == NULL);
redisAsyncFree(ac);
test("Our REDIS_OPT_NO_PUSH_AUTOFREE flag works: ");
options = get_redis_tcp_options(config);
options.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
assert((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->push_cb == NULL);
redisFree(c);
test("We can use redisOptions to set a custom PUSH handler for redisContext: ");
options = get_redis_tcp_options(config);
options.push_cb = push_handler;
assert((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->push_cb == push_handler);
redisFree(c);
test("We can use redisOptions to set a custom PUSH handler for redisAsyncContext: ");
options = get_redis_tcp_options(config);
options.async_push_cb = push_handler_async;
assert((ac = redisAsyncConnectWithOptions(&options)) != NULL);
test_cond(ac->push_cb == push_handler_async);
redisAsyncFree(ac);
}
static void test_blocking_connection(struct config config) { static void test_blocking_connection(struct config config) {
redisContext *c; redisContext *c;
redisReply *reply; redisReply *reply;
int major;
c = do_connect(config); c = do_connect(config);
...@@ -695,6 +867,10 @@ static void test_blocking_connection(struct config config) { ...@@ -695,6 +867,10 @@ static void test_blocking_connection(struct config config) {
assert(redisAppendCommand(c, "PING") == REDIS_OK); assert(redisAppendCommand(c, "PING") == REDIS_OK);
test_cond(redisGetReply(c, NULL) == REDIS_OK); test_cond(redisGetReply(c, NULL) == REDIS_OK);
get_redis_version(c, &major, NULL);
if (major >= 6) test_resp3_push_handler(c);
test_resp3_push_options(config);
disconnect(c, 0); disconnect(c, 0);
} }
...@@ -780,18 +956,7 @@ static void test_blocking_io_errors(struct config config) { ...@@ -780,18 +956,7 @@ static void test_blocking_io_errors(struct config config) {
/* Connect to target given by config. */ /* Connect to target given by config. */
c = do_connect(config); c = do_connect(config);
{ get_redis_version(c, &major, &minor);
/* Find out Redis version to determine the path for the next test */
const char *field = "redis_version:";
char *p, *eptr;
reply = redisCommand(c,"INFO");
p = strstr(reply->str,field);
major = strtol(p+strlen(field),&eptr,10);
p = eptr+1; /* char next to the first "." */
minor = strtol(p,&eptr,10);
freeReplyObject(reply);
}
test("Returns I/O error when the connection is lost: "); test("Returns I/O error when the connection is lost: ");
reply = redisCommand(c,"QUIT"); reply = redisCommand(c,"QUIT");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册