From 96a54866ab4694cf338af0441f28aa69e9643376 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 8 Apr 2020 12:55:57 +0200 Subject: [PATCH] Speedup: unblock clients on keys in O(1). See #7071. --- src/adlist.c | 20 +++++++++++++++++--- src/adlist.h | 3 ++- src/blocked.c | 48 ++++++++++++++++++++++++++++++------------------ src/server.c | 2 +- 4 files changed, 50 insertions(+), 23 deletions(-) diff --git a/src/adlist.c b/src/adlist.c index ec5f8bbf4..0fedc0729 100644 --- a/src/adlist.c +++ b/src/adlist.c @@ -327,12 +327,11 @@ listNode *listIndex(list *list, long index) { } /* Rotate the list removing the tail node and inserting it to the head. */ -void listRotate(list *list) { - listNode *tail = list->tail; - +void listRotateTailToHead(list *list) { if (listLength(list) <= 1) return; /* Detach current tail */ + listNode *tail = list->tail; list->tail = tail->prev; list->tail->next = NULL; /* Move it as head */ @@ -342,6 +341,21 @@ void listRotate(list *list) { list->head = tail; } +/* Rotate the list removing the head node and inserting it to the tail. */ +void listRotateHeadToTail(list *list) { + if (listLength(list) <= 1) return; + + listNode *head = list->head; + /* Detach current head */ + list->head = head->next; + list->head->prev = NULL; + /* Move it as tail */ + list->tail->next = head; + head->next = NULL; + head->prev = list->tail; + list->tail = head; +} + /* Add all the elements of the list 'o' at the end of the * list 'l'. The list 'other' remains empty but otherwise valid. */ void listJoin(list *l, list *o) { diff --git a/src/adlist.h b/src/adlist.h index 28b9016ce..dd8a8d693 100644 --- a/src/adlist.h +++ b/src/adlist.h @@ -85,7 +85,8 @@ listNode *listSearchKey(list *list, void *key); listNode *listIndex(list *list, long index); void listRewind(list *list, listIter *li); void listRewindTail(list *list, listIter *li); -void listRotate(list *list); +void listRotateTailToHead(list *list); +void listRotateHeadToTail(list *list); void listJoin(list *l, list *o); /* Directions for iterators */ diff --git a/src/blocked.c b/src/blocked.c index e3a803ae3..00cc798d5 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -64,6 +64,21 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); +/* This structure represents the blocked key information that we store + * in the client structure. Each client blocked on keys, has a + * client->bpop.keys hash table. The keys of the hash table are Redis + * keys pointers to 'robj' structures. The value is this structure. + * The structure has two goals: firstly we store the list node that this + * client uses to be listed in the database "blocked clients for this key" + * list, so we can later unblock in O(1) without a list scan. + * Secondly for certain blocking types, we have additional info. Right now + * the only use for additional info we have is when clients are blocked + * on streams, as we have to remember the ID it blocked for. */ +typedef struct bkinfo { + listNode *listnode; /* List node for db->blocking_keys[key] list. */ + streamID stream_id; /* Stream ID if we blocked in a stream. */ +} bkinfo; + /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ @@ -211,8 +226,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { if (receiver->btype != BLOCKED_LIST) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -273,8 +287,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { if (receiver->btype != BLOCKED_ZSET) { /* Put at the tail, so that at the next call * we'll not run into it again. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); continue; } @@ -320,8 +333,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { while((ln = listNext(&li))) { client *receiver = listNodeValue(ln); if (receiver->btype != BLOCKED_STREAM) continue; - streamID *gt = dictFetchValue(receiver->bpop.keys, - rl->key); + bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); + streamID *gt = &bki->stream_id; /* If we blocked in the context of a consumer * group, we need to resolve the group and update the @@ -419,8 +432,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * ready to be served, so they'll remain in the list * sometimes. We want also be able to skip clients that are * not blocked for the MODULE type safely. */ - listDelNode(clients,clientnode); - listAddNodeTail(clients,receiver); + listRotateHeadToTail(clients); if (receiver->btype != BLOCKED_MODULE) continue; @@ -551,17 +563,15 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { - /* The value associated with the key name in the bpop.keys dictionary - * is NULL for lists and sorted sets, or the stream ID for streams. */ - void *key_data = NULL; - if (btype == BLOCKED_STREAM) { - key_data = zmalloc(sizeof(streamID)); - memcpy(key_data,ids+j,sizeof(streamID)); - } + /* Allocate our bkinfo structure, associated to each key the client + * is blocked for. */ + bkinfo *bki = zmalloc(sizeof(*bki)); + if (btype == BLOCKED_STREAM) + bki->stream_id = ids[j]; /* If the key already exists in the dictionary ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) { - zfree(key_data); + if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { + zfree(bki); continue; } incrRefCount(keys[j]); @@ -580,6 +590,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo l = dictGetVal(de); } listAddNodeTail(l,c); + bki->listnode = listLast(l); } blockClient(c,btype); } @@ -596,11 +607,12 @@ void unblockClientWaitingData(client *c) { /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); + bkinfo *bki = dictGetVal(de); /* Remove this client from the list of clients waiting for this key. */ l = dictFetchValue(c->db->blocking_keys,key); serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,listSearchKey(l,c)); + listDelNode(l,bki->listnode); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) dictDelete(c->db->blocking_keys,key); diff --git a/src/server.c b/src/server.c index 996e0f5d2..0afd67514 100644 --- a/src/server.c +++ b/src/server.c @@ -1666,7 +1666,7 @@ void clientsCron(void) { /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ - listRotate(server.clients); + listRotateTailToHead(server.clients); head = listFirst(server.clients); c = listNodeValue(head); /* The following functions do different service checks on the client. -- GitLab