From 9265ea6a08e516af7add628806a71b82eb2a6d4f Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Wed, 21 Oct 2020 12:22:19 +0800 Subject: [PATCH] ic-proxy: refresh peers on demand The user can adjust the ic-proxy peer addresses at runtime and reload by sending SIGHUP, if an address is modified or removed, the corresponding peer connection must be closed or reestablished. The same to the peer listener, if the listener port is changed, then must re-setup the listener. --- src/backend/cdb/motion/ic_proxy_main.c | 100 +++++++++++++++++++++++ src/backend/cdb/motion/ic_proxy_peer.c | 21 +++++ src/backend/cdb/motion/ic_proxy_server.h | 1 + 3 files changed, 122 insertions(+) diff --git a/src/backend/cdb/motion/ic_proxy_main.c b/src/backend/cdb/motion/ic_proxy_main.c index 750133d2b1..af0b2eb6f1 100644 --- a/src/backend/cdb/motion/ic_proxy_main.c +++ b/src/backend/cdb/motion/ic_proxy_main.c @@ -27,6 +27,8 @@ #include +static void ic_proxy_server_peer_listener_init(uv_loop_t *loop); + static uv_loop_t ic_proxy_server_loop; static uv_signal_t ic_proxy_server_signal_hup; static uv_signal_t ic_proxy_server_signal_int; @@ -36,6 +38,7 @@ static uv_timer_t ic_proxy_server_timer; static uv_tcp_t ic_proxy_peer_listener; static bool ic_proxy_peer_listening; +static bool ic_proxy_peer_relistening; static uv_pipe_t ic_proxy_client_listener; static bool ic_proxy_client_listening; @@ -55,6 +58,13 @@ ic_proxy_server_peer_listener_on_closed(uv_handle_t *handle) /* A new peer listener will be created on the next timer callback */ ic_proxy_peer_listening = false; + + /* If relisten is requested, do it now */ + if (ic_proxy_peer_relistening) + { + ic_proxy_peer_relistening = false; + ic_proxy_server_peer_listener_init(handle->loop); + } } /* @@ -200,6 +210,40 @@ ic_proxy_server_peer_listener_init(uv_loop_t *loop) ic_proxy_peer_listening = true; } +/* + * Reinit the peer listener. + */ +static void +ic_proxy_server_peer_listener_reinit(uv_loop_t *loop) +{ + const ICProxyAddr *myaddr = ic_proxy_get_my_addr(); + + if (ic_proxy_peer_relistening) + return; + + if (ic_proxy_peer_listening) + { + /* + * We are listening already, so must first close the current one, we + * keep the ic_proxy_peer_listening as true during the process to + * prevent double connect. + */ + ic_proxy_log(LOG, "ic-proxy-server: closing the legacy peer listener"); + + /* Only recreate a new listener if an address is assigned to us */ + ic_proxy_peer_relistening = !!myaddr; + + uv_close((uv_handle_t *) &ic_proxy_peer_listener, + ic_proxy_server_peer_listener_on_closed); + } + else if (myaddr) + { + /* Otherwise simply establish a new one */ + ic_proxy_peer_relistening = false; + ic_proxy_server_peer_listener_init(loop); + } +} + /* * The client listener is closed. */ @@ -353,6 +397,57 @@ ic_proxy_server_ensure_peers(uv_loop_t *loop) } } +/* + * Drop legacy peers. + * + * The list ic_proxy_removed_addrs contains both removed and updated addresses, + * the corresponding peers must be disconnected before taking further actions. + */ +static void +ic_proxy_server_drop_legacy_peers(uv_loop_t *loop) +{ + ListCell *cell; + const ICProxyAddr *myaddr = ic_proxy_get_my_addr(); + + /* + * Also take the chance to check the peer listener. + * + * If myaddr cannot be found at all, the address must have been removed, + * close the current listener. + */ + if (!myaddr) + ic_proxy_server_peer_listener_reinit(loop); + + foreach(cell, ic_proxy_removed_addrs) + { + ICProxyAddr *addr = lfirst(cell); + ICProxyPeer *peer; + + /* + * Also take the chance to check the peer listener. + * + * If myaddr appears in the removed list, then the address must have + * been changed or removed, no need to compare the sockaddrs again. + */ + if (myaddr && myaddr->dbid == addr->dbid) + ic_proxy_server_peer_listener_reinit(loop); + + /* + * Refer to ic_proxy_server_ensure_peers() on why we need below checks. + */ + if (addr->content >= GpIdentity.segindex) + continue; + if (addr->dbid == GpIdentity.dbid) + continue; /* do not connect to my primary / mirror */ + + peer = ic_proxy_peer_lookup(addr->content, addr->dbid); + if (!peer) + continue; + + ic_proxy_peer_disconnect(peer); + } +} + /* * Timer handler. * @@ -383,6 +478,7 @@ ic_proxy_server_on_signal(uv_signal_t *handle, int signum) ProcessConfigFile(PGC_SIGHUP); ic_proxy_reload_addresses(handle->loop); + ic_proxy_server_drop_legacy_peers(handle->loop); ic_proxy_server_peer_listener_init(handle->loop); ic_proxy_server_ensure_peers(handle->loop); @@ -434,6 +530,7 @@ ic_proxy_server_main(void) ic_proxy_client_table_init(); ic_proxy_peer_listening = false; + ic_proxy_peer_relistening = false; ic_proxy_client_listening = false; uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_hup); @@ -510,6 +607,9 @@ ic_proxy_server_quit(uv_loop_t *loop, bool relaunch) */ if (ic_proxy_peer_listening) { + /* cancel pending relistening request */ + ic_proxy_peer_relistening = false; + uv_unref((uv_handle_t *) &ic_proxy_peer_listener); uv_close((uv_handle_t *) &ic_proxy_peer_listener, NULL); } diff --git a/src/backend/cdb/motion/ic_proxy_peer.c b/src/backend/cdb/motion/ic_proxy_peer.c index 783bc92f6c..bbdd0be7f0 100644 --- a/src/backend/cdb/motion/ic_proxy_peer.c +++ b/src/backend/cdb/motion/ic_proxy_peer.c @@ -813,6 +813,27 @@ ic_proxy_peer_connect(ICProxyPeer *peer, struct sockaddr_in *dest) ic_proxy_peer_on_connected); } +/* + * Disconnect a peer. + * + * The peer can be in any state, the caller only needs to ensure not to call + * this function from a peer callback. + */ +void +ic_proxy_peer_disconnect(ICProxyPeer *peer) +{ + /* No such a peer yet */ + if (!peer) + return; + + /* No connection is made or being made */ + if (!(peer->state & IC_PROXY_PEER_STATE_CONNECTING)) + return; + + ic_proxy_log(LOG, "%s: disconnecting", peer->name); + ic_proxy_peer_shutdown(peer); +} + /* * Send a packet to a remote peer. */ diff --git a/src/backend/cdb/motion/ic_proxy_server.h b/src/backend/cdb/motion/ic_proxy_server.h index c51541a099..194e7186ab 100644 --- a/src/backend/cdb/motion/ic_proxy_server.h +++ b/src/backend/cdb/motion/ic_proxy_server.h @@ -127,6 +127,7 @@ extern ICProxyPeer *ic_proxy_peer_new(uv_loop_t *loop, extern void ic_proxy_peer_free(ICProxyPeer *peer); extern void ic_proxy_peer_read_hello(ICProxyPeer *peer); extern void ic_proxy_peer_connect(ICProxyPeer *peer, struct sockaddr_in *dest); +extern void ic_proxy_peer_disconnect(ICProxyPeer *peer); extern void ic_proxy_peer_route_data(ICProxyPeer *peer, ICProxyPkt *pkt, ic_proxy_sent_cb callback, void *opaque); extern ICProxyPeer *ic_proxy_peer_lookup(int16 content, uint16 dbid); -- GitLab