提交 6188fb1f 编写于 作者: N Ning Yu

ic-proxy: implement the core logic

The interconnect proxy mode, a.k.a. ic-proxy, is a new interconnect
mode, all the backends communicate via a proxy bgworker, all the
backends on the same segment share the same proxy bgworker, so every two
segments only need one network connection between them, which reduces
the network flows as well the ports.

To enable the proxy mode we need to first configure the guc
gp_interconnect_proxy_addresses, for example:

    gpconfig \
      -c gp_interconnect_proxy_addresses \
      -v "'1:-1:10.0.0.1:2000,2:0:10.0.0.2:2001,3:1:10.0.0.3:2002'" \
      --skipvalidation

Then restart to take effect.
上级 8804bf39
...@@ -722,6 +722,7 @@ GREP ...@@ -722,6 +722,7 @@ GREP
with_apr_config with_apr_config
with_libcurl with_libcurl
with_rt with_rt
with_libuv
with_quicklz with_quicklz
with_zstd with_zstd
with_libbz2 with_libbz2
...@@ -887,6 +888,7 @@ with_zlib ...@@ -887,6 +888,7 @@ with_zlib
with_libbz2 with_libbz2
with_zstd with_zstd
with_quicklz with_quicklz
with_libuv
with_rt with_rt
with_libcurl with_libcurl
with_apr_config with_apr_config
...@@ -1593,6 +1595,7 @@ Optional Packages: ...@@ -1593,6 +1595,7 @@ Optional Packages:
--without-zstd do not build with Zstandard --without-zstd do not build with Zstandard
--with-quicklz build with QuickLZ support (requires quicklz --with-quicklz build with QuickLZ support (requires quicklz
library) library)
--without-libuv do not use libuv
--without-rt do not use Realtime Library --without-rt do not use Realtime Library
--without-libcurl do not use libcurl --without-libcurl do not use libcurl
--with-apr-config=PATH path to apr-1-config utility --with-apr-config=PATH path to apr-1-config utility
...@@ -8067,6 +8070,35 @@ fi ...@@ -8067,6 +8070,35 @@ fi
#
# libuv. Used for ic-proxy
#
# Check whether --with-libuv was given.
if test "${with_libuv+set}" = set; then :
withval=$with_libuv;
case $withval in
yes)
:
;;
no)
:
;;
*)
as_fn_error $? "no argument expected for --with-libuv option" "$LINENO" 5
;;
esac
else
with_libuv=yes
fi
# #
# Realtime library # Realtime library
# #
...@@ -11651,6 +11683,56 @@ fi ...@@ -11651,6 +11683,56 @@ fi
fi fi
if test "$with_libuv" = yes; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for uv_default_loop in -luv" >&5
$as_echo_n "checking for uv_default_loop in -luv... " >&6; }
if ${ac_cv_lib_uv_uv_default_loop+:} false; then :
$as_echo_n "(cached) " >&6
else
ac_check_lib_save_LIBS=$LIBS
LIBS="-luv $LIBS"
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
/* end confdefs.h. */
/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char uv_default_loop ();
int
main ()
{
return uv_default_loop ();
;
return 0;
}
_ACEOF
if ac_fn_c_try_link "$LINENO"; then :
ac_cv_lib_uv_uv_default_loop=yes
else
ac_cv_lib_uv_uv_default_loop=no
fi
rm -f core conftest.err conftest.$ac_objext \
conftest$ac_exeext conftest.$ac_ext
LIBS=$ac_check_lib_save_LIBS
fi
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_uv_uv_default_loop" >&5
$as_echo "$ac_cv_lib_uv_uv_default_loop" >&6; }
if test "x$ac_cv_lib_uv_uv_default_loop" = xyes; then :
cat >>confdefs.h <<_ACEOF
#define HAVE_LIBUV 1
_ACEOF
LIBS="-luv $LIBS"
else
as_fn_error $? "libuv library not found." "$LINENO" 5
fi
fi
if test "$enable_spinlocks" = yes; then if test "$enable_spinlocks" = yes; then
$as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
......
...@@ -1152,6 +1152,13 @@ PGAC_ARG_BOOL(with, quicklz, no, ...@@ -1152,6 +1152,13 @@ PGAC_ARG_BOOL(with, quicklz, no,
[build with QuickLZ support (requires quicklz library)]) [build with QuickLZ support (requires quicklz library)])
AC_SUBST(with_quicklz) AC_SUBST(with_quicklz)
#
# libuv. Used for ic-proxy
#
PGAC_ARG_BOOL(with, libuv, yes,
[do not use libuv])
AC_SUBST(with_libuv)
# #
# Realtime library # Realtime library
# #
...@@ -1465,6 +1472,11 @@ if test "$with_quicklz" = yes; then ...@@ -1465,6 +1472,11 @@ if test "$with_quicklz" = yes; then
[AC_MSG_ERROR([quicklz library not found.])]) [AC_MSG_ERROR([quicklz library not found.])])
fi fi
if test "$with_libuv" = yes; then
AC_CHECK_LIB(uv, uv_default_loop, [],
[AC_MSG_ERROR([libuv library not found.])])
fi
if test "$enable_spinlocks" = yes; then if test "$enable_spinlocks" = yes; then
AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.]) AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
else else
......
...@@ -247,6 +247,7 @@ YAML_LIBS = @YAML_LIBS@ ...@@ -247,6 +247,7 @@ YAML_LIBS = @YAML_LIBS@
with_zstd = @with_zstd@ with_zstd = @with_zstd@
with_quicklz = @with_quicklz@ with_quicklz = @with_quicklz@
EVENT_LIBS = @EVENT_LIBS@ EVENT_LIBS = @EVENT_LIBS@
with_libuv = @with_libuv@
########################################################################## ##########################################################################
......
...@@ -200,6 +200,15 @@ bool gp_interconnect_log_stats = false; /* emit stats at log-level */ ...@@ -200,6 +200,15 @@ bool gp_interconnect_log_stats = false; /* emit stats at log-level */
bool gp_interconnect_cache_future_packets = true; bool gp_interconnect_cache_future_packets = true;
/*
* format: dbid:content:address:port,dbid:content:address:port ...
* example: 1:-1:10.0.0.1:2000 2:0:10.0.0.2:2000 3:1:10.0.0.2:2001
*
* FIXME: at the moment:
* - the address must be specified as IP;
*/
char *gp_interconnect_proxy_addresses = NULL;
int Gp_udp_bufsize_k; /* UPD recv buf size, in KB */ int Gp_udp_bufsize_k; /* UPD recv buf size, in KB */
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
......
...@@ -15,4 +15,23 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) ...@@ -15,4 +15,23 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
OBJS = cdbmotion.o tupchunklist.o tupser.o \ OBJS = cdbmotion.o tupchunklist.o tupser.o \
ic_common.o ic_tcp.o ic_udpifc.o htupfifo.o tupleremap.o ic_common.o ic_tcp.o ic_udpifc.o htupfifo.o tupleremap.o
ifeq ($(with_libuv),yes)
# server
OBJS += ic_proxy_bgworker.o
OBJS += ic_proxy_main.o
OBJS += ic_proxy_client.o
OBJS += ic_proxy_peer.o
OBJS += ic_proxy_router.o
# backend
OBJS += ic_proxy_backend.o
# utils
OBJS += ic_proxy_addr.o
OBJS += ic_proxy_key.o
OBJS += ic_proxy_packet.o
OBJS += ic_proxy_pkt_cache.o
OBJS += ic_proxy_iobuf.o
endif # with_libuv
include $(top_srcdir)/src/backend/common.mk include $(top_srcdir)/src/backend/common.mk
此差异已折叠。
/*-------------------------------------------------------------------------
*
* ic_proxy.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_H
#define IC_PROXY_H
#include "postgres.h"
#include "cdb/cdbinterconnect.h"
#include "cdb/cdbvars.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#define IC_PROXY_BACKLOG 1024
#define IC_PROXY_INVALID_CONTENT ((uint16) -2)
#define IC_PROXY_INVALID_DBID ((int16) 0)
#define IC_PROXY_TRESHOLD_PAUSE 4
#define IC_PROXY_TRESHOLD_RESUME 2
#ifndef IC_PROXY_LOG_LEVEL
#define IC_PROXY_LOG_LEVEL LOG
#endif
#define ic_proxy_alloc(size) palloc(size)
#define ic_proxy_free(ptr) pfree(ptr)
#define ic_proxy_new(type) ic_proxy_alloc(sizeof(type))
#define ic_proxy_log(elevel, msg...) do { \
if (elevel >= IC_PROXY_LOG_LEVEL) \
{ \
elog(elevel, msg); \
} \
} while (0)
/*
* Build the domain socket path.
*
* Every proxy on the same host must use a different path, this is important to
* let proxies from different segments or even different clusters to coexist.
*
* This is ensured by including the postmaster port in the path.
*/
static inline void
ic_proxy_build_server_sock_path(char *buf, size_t bufsize)
{
snprintf(buf, bufsize, "/tmp/.s.proxy.%d", PostPortNumber);
}
#endif /* IC_PROXY_H */
/*-------------------------------------------------------------------------
*
* ic_proxy_addr.c
*
* Interconnect Proxy Addresses
*
* Maintain the address information of all the proxies, which is set by the GUC
* gp_interconnect_proxy_addresses.
*
* FIXME: currently that GUC can not be reloaded with "gpstop -u", so we must
* restart the cluster to update the setting. This causes problems during
* online expansion, when new segments are added to the cluster, we must update
* this GUC to include their information, so until the cluster is restarted all
* the ic-proxy mode queries will hang.
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include <uv.h>
#include "ic_proxy.h"
#include "ic_proxy_addr.h"
/*
* List<ICProxyAddr *>, the addresses list.
*/
List *ic_proxy_addrs;
/*
* Reload the addresses from the GUC gp_interconnect_proxy_addresses.
*
* The caller is responsible to load the up-to-date setting of that GUC by
* calling ProcessConfigFile().
*/
void
ic_proxy_reload_addresses(void)
{
int max_content_id;
int uniq_content_count;
/* reset the old addresses */
{
ListCell *cell;
foreach(cell, ic_proxy_addrs)
{
ic_proxy_free(lfirst(cell));
}
list_free(ic_proxy_addrs);
ic_proxy_addrs = NIL;
}
max_content_id = IC_PROXY_INVALID_CONTENT;
/* parse the new addresses */
{
int size = strlen(gp_interconnect_proxy_addresses) + 1;
char *buf;
FILE *f;
int dbid;
int content;
int port;
char ip[HOST_NAME_MAX];
buf = ic_proxy_alloc(size);
memcpy(buf, gp_interconnect_proxy_addresses, size);
f = fmemopen(buf, size, "r");
/*
* format: dbid:segid:ip:port
*/
while (fscanf(f, "%d:%d:%[0-9.]:%d,", &dbid, &content, ip, &port) == 4)
{
ICProxyAddr *addr = ic_proxy_new(ICProxyAddr);
int ret;
addr->dbid = dbid;
addr->content = content;
ic_proxy_log(LOG, "ic-proxy-server: addr: seg%d,dbid%d: %s:%d",
content, dbid, ip, port);
ret = uv_ip4_addr(ip, port, (struct sockaddr_in *) addr);
if (ret < 0)
ic_proxy_log(WARNING,
"ic-proxy-server: invalid address: seg%d,dbid%d: %s:%d: %s",
content, dbid, ip, port, uv_strerror(ret));
ic_proxy_addrs = lappend(ic_proxy_addrs, addr);
max_content_id = Max(max_content_id, content);
}
fclose(f);
ic_proxy_free(buf);
}
/*
* We have found the max content id, convert it to a count by adding 2, as
* content ids are counted from -1.
*/
uniq_content_count = max_content_id + 2;
ic_proxy_log(LOG, "ic-proxy-server: %d unique content ids",
uniq_content_count);
}
/*
* Get the port of current segment.
*
* Return -1 if cannot find the port.
*/
int
ic_proxy_get_my_port(void)
{
ListCell *cell;
int dbid = GpIdentity.dbid;
foreach(cell, ic_proxy_addrs)
{
ICProxyAddr *addr = lfirst(cell);
if (addr->dbid == dbid)
return ic_proxy_addr_get_port(addr);
}
ic_proxy_log(WARNING, "ic-proxy-addr: cannot get my port");
return -1;
}
/*
* Get the port from an address.
*
* Return -1 if cannot find the port.
*/
int
ic_proxy_addr_get_port(const ICProxyAddr *addr)
{
if (addr->addr.ss_family == AF_INET)
return ntohs(((struct sockaddr_in *) addr)->sin_port);
else if (addr->addr.ss_family == AF_INET6)
return ntohs(((struct sockaddr_in6 *) addr)->sin6_port);
ic_proxy_log(WARNING,
"ic-proxy-addr: invalid address family %d for seg%d,dbid%d",
addr->addr.ss_family, addr->content, addr->dbid);
return -1;
}
/*-------------------------------------------------------------------------
*
* ic_proxy_addr.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_ADDR_H
#define IC_PROXY_ADDR_H
typedef struct ICProxyAddr ICProxyAddr;
struct ICProxyAddr
{
struct sockaddr_storage addr;
int dbid;
int content;
};
/*
* List<ICProxyAddr *>
*/
extern List *ic_proxy_addrs;
extern void ic_proxy_reload_addresses(void);
extern int ic_proxy_get_my_port(void);
extern int ic_proxy_addr_get_port(const ICProxyAddr *addr);
#endif /* IC_PROXY_ADDR_H */
/*-------------------------------------------------------------------------
*
* ic_proxy_backend.c
*
* Interconnect Proxy Backend
*
* The functions in this file are all called by the backends.
*
* A backend and a client is the 2 end points of a domain socket, the backend
* is in the QD/QE backend process, the client is in the proxy bgworker
* process.
*
* TODO: at the moment the ic-tcp backend logic is reused by ic-proxy, in fact
* the ic-proxy logic just lives inside ic_tcp.c, but in the future we may want
* to build the ic-proxy logic independently, the potential benefits are:
* - concurrent connection establishments: in current ic-tcp based
* implementation we establish the connections to the proxy one by one, it
* causes higher latency; in the new logic we could do them concurrently;
* - send / receive as ICProxyPkt directly: ic-tcp sends with a 4-byte header,
* in the proxy bgworker we need to unpack them and repack them into
* ICProxyPkt packets; the receiver needs to do it reversely; if we send /
* receive as ICProxyPkt directly we could reduce the parsing and memory
* copying;
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbvars.h"
#include "executor/execdesc.h"
#include "ic_proxy.h"
#include "ic_proxy_backend.h"
#include "ic_proxy_packet.h"
#include "ic_proxy_key.h"
#include <unistd.h>
/*
* Connect to a proxy and finish the hand shaking.
*
* FIXME: this function is to make the debugging easier, however as it
* establishes the connection, as well as the registeration, one by one, it has
* bad performance.
*/
int
ic_proxy_backend_connect(ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
struct sockaddr_un addr;
ICProxyKey key;
ICProxyPkt pkt;
int fd;
int ret;
char *data;
int size;
ic_proxy_key_init(&key, /* key */
gp_session_id, /* sessionId */
gp_command_count, /* commandId */
pEntry->sendSlice->sliceIndex, /* sendSliceIndex */
pEntry->recvSlice->sliceIndex, /* recvSliceIndex */
GpIdentity.segindex, /* localContentId */
GpIdentity.dbid, /* localDbid */
MyProcPid, /* localPid */
conn->cdbProc->contentid, /* remoteContentId */
conn->cdbProc->dbid, /* remoteDbid */
conn->cdbProc->pid); /* remotePid */
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
ic_proxy_build_server_sock_path(addr.sun_path, sizeof(addr.sun_path));
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == PGINVALID_SOCKET)
ic_proxy_log(ERROR, "ic-proxy-backend: fail to create a socket: %m");
do
{
ret = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
if (ret == -1 && errno == EINTR)
continue;
if (ret == -1)
{
ic_proxy_log(LOG, "ic-proxy-backend: fail to connect to %s: %m",
addr.sun_path);
pg_usleep(100000);
continue;
}
} while (ret == -1);
ic_proxy_message_init(&pkt, IC_PROXY_MESSAGE_HELLO, &key);
/* send HELLO */
data = (char *) &pkt;
size = pkt.len;
while (size > 0)
{
do
{
ret = send(fd, data, size, 0);
} while (ret == -1 && errno == EINTR);
if (ret == -1)
ic_proxy_log(ERROR, "ic-proxy-backend: fail to send HELLO: %m");
data += ret;
size -= ret;
}
/* recv HELLO ACK */
data = (char *) &pkt;
size = sizeof(pkt);
while (size > 0)
{
do
{
ret = recv(fd, data, size, 0);
} while (ret == -1 && errno == EINTR);
if (ret == -1)
ic_proxy_log(ERROR, "ic-proxy-backend: fail to recv HELLO ACK: %m");
else if (ret == 0)
{
ic_proxy_log(LOG, "ic-proxy-backend: the peer is not ready");
shutdown(fd, SHUT_RDWR);
close(fd);
fd = -1;
break;
}
data += ret;
size -= ret;
}
return fd;
}
/*-------------------------------------------------------------------------
*
* ic_proxy_backend.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_BACKEND_H
#define IC_PROXY_BACKEND_H
#include "postgres.h"
#include "cdb/cdbinterconnect.h"
extern int ic_proxy_backend_connect(ChunkTransportStateEntry *pEntry,
MotionConn *conn);
#endif /* IC_PROXY_BACKEND_H */
/*-------------------------------------------------------------------------
*
* ic_proxy_bgworker.c
*
* Interconnect Proxy Background Worker
*
* This is only a wrapper, the actual main loop is in ic_proxy_main.c .
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/ipc.h"
#include "cdb/ic_proxy_bgworker.h"
#include "ic_proxy_server.h"
bool
ICProxyStartRule(Datum main_arg)
{
return true;
}
/*
* ICProxyMain
*/
void
ICProxyMain(Datum main_arg)
{
/* main loop */
proc_exit(ic_proxy_server_main());
}
此差异已折叠。
/*-------------------------------------------------------------------------
*
* ic_proxy_iobuf.c
*
* Interconnect Proxy I/O Buffer
*
* An ibuf detects the boundaries of input packets and generate a callback for
* every complete packet.
*
* An obuf combines small output packets into large ones, which helps to reduce
* the overhead of packet headers.
*
* Both i/o bufs are designed to support any kind of packets, as long as they
* have a fixed-size packet header, and the maximum packet size does not exceed
* IC_PROXY_MAX_PKT_SIZE. Two built-in packet formats are provided:
*
* - b2c packet: the the ic-tcp packet, which contains a 4-byte header;
* - p2p packet: the ic-proxy packet, which contains a 32-byte header;
*
* Other formats can be supported by providing custom methods.
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "cdb/ml_ipc.h" /* for ic-tcp packet */
#include "ic_proxy.h"
#include "ic_proxy_iobuf.h"
#include "ic_proxy_packet.h"
#include "ic_proxy_pkt_cache.h"
/*
* Initialize an ibuf.
*
* The packet format must has a fixed-size header, which is specified by
* header_size. Once a complete header is received the get_packet_size()
* method is called to parse the packet size, including both the header and the
* body.
*/
void
ic_proxy_ibuf_init(ICProxyIBuf *ibuf, uint16 header_size,
uint16 (* get_packet_size) (const void *data))
{
ibuf->len = 0;
ibuf->buf = NULL;
ibuf->header_size = header_size;
ibuf->get_packet_size = get_packet_size;
}
/*
* Uninitialize an ibuf.
*/
void
ic_proxy_ibuf_uninit(ICProxyIBuf *ibuf)
{
if (ibuf->buf)
{
ic_proxy_pkt_cache_free(ibuf->buf);
ibuf->buf = NULL;
}
}
/*
* Clear the ibuf.
*
* Unconsumed bytes are dropped directly.
*/
void
ic_proxy_ibuf_clear(ICProxyIBuf *ibuf)
{
if (ibuf->len > 0)
ic_proxy_log(WARNING, "ic-proxy-ibuf: dropped %d bytes", ibuf->len);
ibuf->len = 0;
}
/*
* Return true if the ibuf is empty.
*/
bool
ic_proxy_ibuf_empty(const ICProxyIBuf *ibuf)
{
return ibuf->len == 0;
}
/*
* Push data to the ibuf.
*
* The "data" and "size" are the pointer and the size of the data, they are
* appended to the "ibuf". If a complete packet is composed the "callback" is
* called with the "opaque" as the callback data.
*
* If "size" is 0 then a force flush is triggered, the "callback" is called
* with the incomplete packet.
*
* TODO: libuv supports sending an array of buffers, so it is technically
* possible to prevent the memory copying in this function, we only need to let
* the get_packet_size() method supports parsing the packet size from an array
* of buffers. However the callback function also needs to support
* non-continues packet, an easy way is to only copying the header into
* continues memory.
*/
void
ic_proxy_ibuf_push(ICProxyIBuf *ibuf,
const char *data, uint16 size,
ic_proxy_iobuf_data_callback callback,
void *opaque)
{
uint16 packet_size;
uint16 delta;
if (unlikely(ibuf->buf == NULL))
ibuf->buf = ic_proxy_pkt_cache_alloc(NULL);
/* a force-flush */
if (unlikely(size == 0))
{
/* TODO: do we need to flush if ibuf->len is 0? */
callback(opaque, ibuf->buf, ibuf->len);
ibuf->len = 0;
return;
}
if (ibuf->len > 0)
{
if (ibuf->len < ibuf->header_size)
{
/* haven't got a complete header yet */
delta = Min(ibuf->header_size - ibuf->len, size);
memcpy(ibuf->buf + ibuf->len, data, delta);
ibuf->len += delta;
data += delta;
size -= delta;
if (ibuf->len < ibuf->header_size)
/* still not having a complete header */
return;
}
{
/* have a complete header now */
packet_size = ibuf->get_packet_size(ibuf->buf);
delta = Min(packet_size - ibuf->len, size);
memcpy(ibuf->buf + ibuf->len, data, delta);
ibuf->len += delta;
data += delta;
size -= delta;
if (ibuf->len < packet_size)
/* still not having a complete packet */
return;
}
{
/* have a complete pkt now */
callback(opaque, ibuf->buf, packet_size);
ibuf->len = 0;
}
}
while (size >= ibuf->header_size)
{
packet_size = ibuf->get_packet_size(data);
if (packet_size <= size)
{
/* got a complete pkt */
callback(opaque, data, packet_size);
data += packet_size;
size -= packet_size;
}
else
/* got a incomplete pkt */
break;
}
if (size > 0)
{
/* got a incomplete pkt */
memcpy(ibuf->buf, data, size);
ibuf->len = size;
}
}
/*
* Get the packet size of a b2c one.
*
* The b2c packet only contains a 4-byte packet length in host byte-order.
*/
static uint16
ic_proxy_ibuf_get_packet_size_b2c(const void *data)
{
return *(const uint32 *) data;
}
/*
* Initialize an ibuf for b2c packet.
*/
void
ic_proxy_ibuf_init_b2c(ICProxyIBuf *ibuf)
{
ic_proxy_ibuf_init(ibuf, PACKET_HEADER_SIZE,
ic_proxy_ibuf_get_packet_size_b2c);
}
/*
* Get the packet size of a p2p one.
*
* The p2p packet contains a 32-byte ICProxyPkt header, all the fields are in
* host byte-order.
*/
static uint16
ic_proxy_ibuf_get_packet_size_p2p(const void *data)
{
const ICProxyPkt *pkt = data;
return pkt->len;
}
/*
* Initialize an ibuf for p2p packet.
*/
void
ic_proxy_ibuf_init_p2p(ICProxyIBuf *ibuf)
{
ic_proxy_ibuf_init(ibuf, sizeof(ICProxyPkt),
ic_proxy_ibuf_get_packet_size_p2p);
}
/*
* Initialize an obuf.
*
* The packet format must has a fixed-size header, which is specified by
* header_size. Once a packet is to be sent, the set_packet_size() method is
* called to set the packet size in the header.
*
* The ic_proxy_obuf_ensure_buffer() must be called after init and before any
* data is pushed.
*/
void
ic_proxy_obuf_init(ICProxyOBuf *obuf, uint16 header_size,
void (* set_packet_size) (void *data, uint16 size))
{
obuf->len = 0;
obuf->buf = NULL;
obuf->header_size = header_size;
obuf->set_packet_size = set_packet_size;
}
/*
* Uninitialize an obuf.
*/
void
ic_proxy_obuf_uninit(ICProxyOBuf *obuf)
{
if (obuf->buf)
{
ic_proxy_pkt_cache_free(obuf->buf);
obuf->buf = NULL;
}
}
/*
* Ensure that the obuf has allocated its buffer.
*
* The ibuf does not allocate buffer until this function is called, this helps
* to prevent some unnecessary memory allocation.
*
* The buffer pointer is returned, the caller is responsible to initialize the
* header. The packet size, however, should be set in the set_packet_size()
* method.
*
* TODO: This used to be useful to improve the performance slightly, however as
* we have the packet cache already, we do not need this lazy allocation
* anymore, we could retire it.
*/
void *
ic_proxy_obuf_ensure_buffer(ICProxyOBuf *obuf)
{
if (unlikely(obuf->buf == NULL))
{
obuf->buf = ic_proxy_pkt_cache_alloc(NULL);
obuf->len = obuf->header_size;
}
return obuf->buf;
}
/*
* Push data to the obuf.
*
* The "data" and "size" are the pointer and the size of the data, it is
* promised that the data is always fed to the "callback" as an entity. If
* there is no enough room for the "data", the bytes in the buffer will first
* be fed to the "callback" to clear the room.
*
* If "size" is 0 then a force flush is triggered.
*
* The "callback" will be called with one or more complete data. The output
* packet header is always set before feeding to the "callback".
*/
void
ic_proxy_obuf_push(ICProxyOBuf *obuf,
const char *data, uint16 size,
ic_proxy_iobuf_data_callback callback,
void *opaque)
{
if (unlikely(obuf->buf == NULL))
ic_proxy_log(ERROR,
"ic-proxy-obuf: the caller must init the header before pushing data");
/*
* Need a flush when:
* - size == 0 means a force flush;
* - or no enough space for the new data;
*/
if (unlikely(size == 0 || size + obuf->len > IC_PROXY_MAX_PKT_SIZE))
{
if (obuf->header_size + size > IC_PROXY_MAX_PKT_SIZE)
ic_proxy_log(ERROR,
"ic-proxy-obuf: no enough buffer to store the data:"
" the data size is %d bytes,"
" but the buffer size is only %zd bytes,"
" including a %d bytes header",
size, IC_PROXY_MAX_PKT_SIZE, obuf->header_size);
/* TODO: should we flush if no data in the packet? */
if (obuf->len == obuf->header_size)
ic_proxy_log(LOG, "ic-proxy-obuf: no data to flush");
else
{
obuf->set_packet_size(obuf->buf, obuf->len);
callback(opaque, obuf->buf, obuf->len);
/* we will reuse the header */
obuf->len = obuf->header_size;
}
}
/* the trailing data will be sent later */
if (size > 0)
{
memcpy(obuf->buf + obuf->len, data, size);
obuf->len += size;
}
}
/*
* Set the packet size of a b2c one.
*
* The b2c packet only contains a 4-byte packet length in host byte-order.
*/
static void
ic_proxy_obuf_set_packet_size_b2c(void *data, uint16 size)
{
*(uint32 *) data = size;
}
/*
* Initialize an obuf for b2c packet.
*/
void
ic_proxy_obuf_init_b2c(ICProxyOBuf *obuf)
{
ic_proxy_obuf_init(obuf, PACKET_HEADER_SIZE,
ic_proxy_obuf_set_packet_size_b2c);
}
/*
* Set the packet size of a p2p one.
*
* The p2p packet contains a 32-byte ICProxyPkt header, all the fields are in
* host byte-order.
*/
static void
ic_proxy_obuf_set_packet_size_p2p(void *data, uint16 size)
{
ICProxyPkt *pkt = data;
pkt->len = size;
}
/*
* Initialize an obuf for p2p packet.
*/
void
ic_proxy_obuf_init_p2p(ICProxyOBuf *obuf)
{
ic_proxy_obuf_init(obuf, sizeof(ICProxyPkt),
ic_proxy_obuf_set_packet_size_p2p);
}
/*-------------------------------------------------------------------------
*
* ic_proxy_iobuf.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_IOBUF_H
#define IC_PROXY_IOBUF_H
#include "postgres.h"
typedef struct ICProxyOBuf ICProxyOBuf;
typedef struct ICProxyIBuf ICProxyIBuf;
typedef void (* ic_proxy_iobuf_data_callback) (void *opaque,
const void *data,
uint16 size);
struct ICProxyIBuf
{
char *buf;
uint16 len;
uint16 header_size;
uint16 (* get_packet_size) (const void *data);
};
struct ICProxyOBuf
{
char *buf;
uint16 len;
uint16 header_size;
void (* set_packet_size) (void *data, uint16 size);
};
extern void ic_proxy_obuf_init(ICProxyOBuf *obuf, uint16 header_size,
void (* set_packet_size) (void *data,
uint16 size));
extern void ic_proxy_obuf_init_b2c(ICProxyOBuf *obuf);
extern void ic_proxy_obuf_init_p2p(ICProxyOBuf *obuf);
extern void ic_proxy_obuf_uninit(ICProxyOBuf *obuf);
extern void *ic_proxy_obuf_ensure_buffer(ICProxyOBuf *obuf);
extern void ic_proxy_obuf_push(ICProxyOBuf *obuf,
const char *data, uint16 size,
ic_proxy_iobuf_data_callback callback,
void *opaque);
extern void ic_proxy_ibuf_init(ICProxyIBuf *ibuf, uint16 header_size,
uint16 (* get_packet_size) (const void *data));
extern void ic_proxy_ibuf_init_b2c(ICProxyIBuf *ibuf);
extern void ic_proxy_ibuf_init_p2p(ICProxyIBuf *ibuf);
extern void ic_proxy_ibuf_uninit(ICProxyIBuf *ibuf);
extern void ic_proxy_ibuf_clear(ICProxyIBuf *ibuf);
extern bool ic_proxy_ibuf_empty(const ICProxyIBuf *ibuf);
extern void ic_proxy_ibuf_push(ICProxyIBuf *ibuf,
const char *data, uint16 size,
ic_proxy_iobuf_data_callback callback,
void *opaque);
#endif /* IC_PROXY_IOBUF_H */
/*-------------------------------------------------------------------------
*
* ic_proxy_key.c
*
* Interconnect Proxy Key
*
* A key is actually a logical connection identifier, it identifies the sender,
* the receiver, as well as the logical connection itself.
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "ic_proxy_key.h"
/*
* Compares whether two keys are identical.
*
* Not all the attributes are compared. Content ids are not compared as the
* dbids are compared. Session ids are not compared because pids are compared.
*/
bool
ic_proxy_key_equal(const ICProxyKey *key1, const ICProxyKey *key2)
{
return key1->localDbid == key2->localDbid
&& key1->localPid == key2->localPid
&& key1->remoteDbid == key2->remoteDbid
&& key1->remotePid == key2->remotePid
&& key1->commandId == key2->commandId
&& key1->sendSliceIndex == key2->sendSliceIndex
&& key1->recvSliceIndex == key2->recvSliceIndex
;
}
/*
* Hash function for keys.
*
* The logic is derived from CONN_HASH_VALUE(), however it is possible to build
* a better with less collision.
*/
uint32
ic_proxy_key_hash(const ICProxyKey *key, Size keysize)
{
return (key->localPid ^ key->remotePid) + key->remoteDbid + key->commandId;
}
/*
* Equal function for keys.
*
* Return 0 for match, otherwise for no match
*/
int
ic_proxy_key_equal_for_hash(const ICProxyKey *key1,
const ICProxyKey *key2, Size keysize)
{
return !ic_proxy_key_equal(key1, key2);
}
/*
* Initialize a key.
*
* All the fields are passed as arguments.
*/
void
ic_proxy_key_init(ICProxyKey *key,
int32 sessionId, uint32 commandId,
int16 sendSliceIndex, int16 recvSliceIndex,
int16 localContentId, uint16 localDbid, int32 localPid,
int16 remoteContentId, uint16 remoteDbid, int32 remotePid)
{
key->sessionId = sessionId;
key->commandId = commandId;
key->sendSliceIndex = sendSliceIndex;
key->recvSliceIndex = recvSliceIndex;
key->localContentId = localContentId;
key->localDbid = localDbid;
key->localPid = localPid;
key->remoteContentId = remoteContentId;
key->remoteDbid = remoteDbid;
key->remotePid = remotePid;
}
/*
* Initialize a key from a peer-to-client packet.
*
* A peer-to-client packet is to a local client, it is usually from a remote
* peer, but it is also possible to be from a different client on the same
* segment.
*/
void
ic_proxy_key_from_p2c_pkt(ICProxyKey *key, const ICProxyPkt *pkt)
{
ic_proxy_key_init(key, pkt->sessionId, pkt->commandId,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->dstContentId, pkt->dstDbid, pkt->dstPid,
pkt->srcContentId, pkt->srcDbid, pkt->srcPid);
}
/*
* Initialize a key from a client-to-peer packet.
*
* A client-to-peer packet is from a local client, it is usually to a remote
* peer, but it is also possible to be to a different client on the same
* segment.
*/
void
ic_proxy_key_from_c2p_pkt(ICProxyKey *key, const ICProxyPkt *pkt)
{
ic_proxy_key_init(key, pkt->sessionId, pkt->commandId,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->srcContentId, pkt->srcDbid, pkt->srcPid,
pkt->dstContentId, pkt->dstDbid, pkt->dstPid);
}
/*
* Reverse the direction of a key.
*
* Convert a peer-to-client packet to a client-to-peer one, or the reverse.
*/
void
ic_proxy_key_reverse(ICProxyKey *key)
{
#define __swap(a, b) do { tmp = (a); (a) = (b); (b) = tmp; } while (0)
int32 tmp;
__swap(key->localContentId, key->remoteContentId);
__swap(key->localDbid, key->remoteDbid);
__swap(key->localPid, key->remotePid);
#undef __swap
}
/*
* Build a describe string of a key.
*
* The string contains all the key information, can be used in log & error
* messages.
*
* Return the string, which must not be freed. The string is in a static
* buffer, so a second call to this function will overwrite the result of the
* previous call.
*/
const char *
ic_proxy_key_to_str(const ICProxyKey *key)
{
static char buf[256];
snprintf(buf, sizeof(buf),
"[con%d,cmd%d,slice[%hd->%hd] seg%hd:dbid%hu:p%d->seg%hd:dbid%hu:p%d]",
key->sessionId, key->commandId,
key->sendSliceIndex, key->recvSliceIndex,
key->localContentId, key->localDbid, key->localPid,
key->remoteContentId, key->remoteDbid, key->remotePid);
return buf;
}
/*-------------------------------------------------------------------------
*
* ic_proxy_key.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_KEY_H
#define IC_PROXY_KEY_H
#include "postgres.h"
typedef struct ICProxyKey ICProxyKey;
/* we have to include it after the declaration of ICProxyKey */
#include "ic_proxy_packet.h"
/*
* XXX: sessionId and {local,remote}ContentId are only for debugging purpose,
* they are not actually part of the key.
*
* dbid and segindex are both defined as int32 in type GpId, however we define
* them as int16 to reduce the size of ICProxyKey, we also mark dbids as
* unsigned in hope that the compiler could help to catch the errors if we pass
* them in the wrong order.
*/
struct ICProxyKey
{
int16 localContentId;
uint16 localDbid;
int32 localPid;
int16 remoteContentId;
uint16 remoteDbid;
int32 remotePid;
int32 sessionId;
uint32 commandId;
int16 sendSliceIndex;
int16 recvSliceIndex;
};
extern uint32 ic_proxy_key_hash(const ICProxyKey *key, Size keysize);
extern bool ic_proxy_key_equal(const ICProxyKey *key1, const ICProxyKey *key2);
extern int ic_proxy_key_equal_for_hash(const ICProxyKey *key1,
const ICProxyKey *key2,
Size keysize);
extern void ic_proxy_key_init(ICProxyKey *key,
int32 sessionId, uint32 commandId,
int16 sendSliceIndex, int16 recvSliceIndex,
int16 localContentId, uint16 localDbid, int32 localPid,
int16 remoteContentId, uint16 remoteDbid, int32 remotePid);
extern void ic_proxy_key_from_p2c_pkt(ICProxyKey *key, const ICProxyPkt *pkt);
extern void ic_proxy_key_from_c2p_pkt(ICProxyKey *key, const ICProxyPkt *pkt);
extern void ic_proxy_key_reverse(ICProxyKey *key);
extern const char *ic_proxy_key_to_str(const ICProxyKey *key);
#endif /* IC_PROXY_KEY_H */
/*-------------------------------------------------------------------------
*
* ic_proxy_main.c
*
* The main loop of the ic-proxy, it listens for both new peers and new
* clients, it also establish the peer connections.
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "postmaster/bgworker.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "ic_proxy_server.h"
#include "ic_proxy_addr.h"
#include "ic_proxy_pkt_cache.h"
#include <uv.h>
#include <unistd.h>
static uv_loop_t ic_proxy_server_loop;
static uv_signal_t ic_proxy_server_signal_hup;
static uv_signal_t ic_proxy_server_signal_int;
static uv_signal_t ic_proxy_server_signal_term;
static uv_signal_t ic_proxy_server_signal_stop;
static uv_timer_t ic_proxy_server_timer;
static uv_tcp_t ic_proxy_peer_listener;
static bool ic_proxy_peer_listening;
static uv_pipe_t ic_proxy_client_listener;
static bool ic_proxy_client_listening;
static int ic_proxy_server_exit_code = 1;
/*
* The peer listener is closed.
*/
static void
ic_proxy_server_peer_listener_on_closed(uv_handle_t *handle)
{
ic_proxy_log(LOG, "ic-proxy-server: peer listener: closed");
/* A new peer listener will be created on the next timer callback */
ic_proxy_peer_listening = false;
}
/*
* New peer arrives.
*/
static void
ic_proxy_server_on_new_peer(uv_stream_t *server, int status)
{
ICProxyPeer *peer;
int ret;
if (status < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: new peer error: %s",
uv_strerror(status));
uv_close((uv_handle_t *) server,
ic_proxy_server_peer_listener_on_closed);
return;
}
ic_proxy_log(LOG, "ic-proxy-server: new peer to the server");
peer = ic_proxy_peer_new(server->loop,
IC_PROXY_INVALID_CONTENT, IC_PROXY_INVALID_DBID);
ret = uv_accept(server, (uv_stream_t *) &peer->tcp);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: fail to accept new peer: %s",
uv_strerror(ret));
ic_proxy_peer_free(peer);
return;
}
/* TODO: it is better to only touch the states in peer.c */
peer->state |= IC_PROXY_PEER_STATE_ACCEPTED;
/* Dump some connection information, not very useful though */
{
struct sockaddr_storage peeraddr;
int addrlen = sizeof(peeraddr);
char name[HOST_NAME_MAX];
uv_tcp_getpeername(&peer->tcp, (struct sockaddr *) &peeraddr, &addrlen);
if (peeraddr.ss_family == AF_INET)
{
struct sockaddr_in *peeraddr4 = (struct sockaddr_in *) &peeraddr;
uv_ip4_name(peeraddr4, name, sizeof(name));
ic_proxy_log(LOG, "ic-proxy-server: the new peer is from %s:%d",
name, ntohs(peeraddr4->sin_port));
}
else if (peeraddr.ss_family == AF_INET6)
{
struct sockaddr_in6 *peeraddr6 = (struct sockaddr_in6 *) &peeraddr;
uv_ip6_name(peeraddr6, name, sizeof(name));
ic_proxy_log(LOG, "ic-proxy-server: the new peer is from %s:%d",
name, ntohs(peeraddr6->sin6_port));
}
}
ic_proxy_peer_read_hello(peer);
}
/*
* Setup the peer listener.
*
* The peer listener listens on a tcp socket, the peer connections will come
* through it.
*/
static void
ic_proxy_server_peer_listener_init(uv_loop_t *loop)
{
struct sockaddr_in addr;
uv_tcp_t *listener = &ic_proxy_peer_listener;
int port;
int fd = -1;
int ret;
if (ic_proxy_addrs == NIL)
return;
if (ic_proxy_peer_listening)
return;
/* Get the ip from the gp_interconnect_proxy_addresses */
port = ic_proxy_get_my_port();
if (port < 0)
/* Cannot get my port, maybe the setting is invalid */
return;
/*
* TODO: listen on the ip specified in gp_interconnect_proxy_addresses for
* better security.
*/
uv_ip4_addr("0.0.0.0", port, &addr);
ic_proxy_log(LOG, "ic-proxy-server: setting up peer listener on port %d",
port);
/*
* It is important to set TCP_NODELAY, otherwise we will suffer from
* significant latency and get very bad OLTP performance.
*/
uv_tcp_init(loop, listener);
uv_tcp_nodelay(listener, true);
ret = uv_tcp_bind(listener, (struct sockaddr *) &addr, 0);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: tcp: fail to bind: %s",
uv_strerror(ret));
return;
}
ret = uv_listen((uv_stream_t *) listener,
IC_PROXY_BACKLOG, ic_proxy_server_on_new_peer);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: tcp: fail to listen: %s",
uv_strerror(ret));
return;
}
uv_fileno((uv_handle_t *) listener, &fd);
ic_proxy_log(LOG, "ic-proxy-server: tcp: listening on socket %d", fd);
ic_proxy_peer_listening = true;
}
/*
* The client listener is closed.
*/
static void
ic_proxy_server_client_listener_on_closed(uv_handle_t *handle)
{
ic_proxy_log(LOG, "ic-proxy-server: client listener: closed");
/* A new client listener will be created on the next timer callback */
ic_proxy_client_listening = false;
}
/*
* New client arrives.
*/
static void
ic_proxy_server_on_new_client(uv_stream_t *server, int status)
{
ICProxyClient *client;
int ret;
if (status < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: new client error: %s",
uv_strerror(status));
uv_close((uv_handle_t *) server,
ic_proxy_server_client_listener_on_closed);
return;
}
ic_proxy_log(LOG, "ic-proxy-server: new client to the server");
client = ic_proxy_client_new(server->loop, false);
ret = uv_accept(server, ic_proxy_client_get_stream(client));
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: fail to accept new client: %s",
uv_strerror(ret));
return;
}
ic_proxy_client_read_hello(client);
}
/*
* Setup the client listener.
*
* The client listener listens on a domain socket, the client connections will
* come through it.
*/
static void
ic_proxy_server_client_listener_init(uv_loop_t *loop)
{
uv_pipe_t *listener = &ic_proxy_client_listener;
char path[MAXPGPATH];
int fd = -1;
int ret;
if (ic_proxy_client_listening)
return;
ic_proxy_build_server_sock_path(path, sizeof(path));
/* FIXME: do not unlink here */
ic_proxy_log(LOG, "unlink(%s) ...", path);
unlink(path);
ic_proxy_log(LOG, "ic-proxy-server: setting up client listener on address %s",
path);
ret = uv_pipe_init(loop, listener, false);
if (ret < 0)
{
ic_proxy_log(WARNING,
"ic-proxy-server: fail to init a client listener: %s",
uv_strerror(ret));
return;
}
ret = uv_pipe_bind(listener, path);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: pipe: fail to bind(%s): %s",
path, uv_strerror(ret));
return;
}
ret = uv_listen((uv_stream_t *) listener,
IC_PROXY_BACKLOG, ic_proxy_server_on_new_client);
if (ret < 0)
{
ic_proxy_log(WARNING, "ic-proxy-server: pipe: fail to listen on path %s: %s",
path, uv_strerror(ret));
return;
}
uv_fileno((uv_handle_t *) listener, &fd);
ic_proxy_log(LOG, "ic-proxy-server: pipe: listening on socket %d", fd);
/*
* Dump the inode of the domain socket file, this helps us to know that the
* file is replaced by someone. This is not likely to happen, we have
* carefully choosen the file path to not conflict with each other.
*/
{
struct stat st;
stat(path, &st);
ic_proxy_log(LOG, "ic-proxy-server: dev=%lu, inode=%lu, path=%s",
st.st_dev, st.st_ino, path);
}
ic_proxy_client_listening = true;
}
/*
* Establish the peer connections.
*
* A proxy connects to all the other proxies, all these connections form the
* proxy network. Only one connection is needed between 2 proxies, this is
* ensured by a policy that "proxy X connects to proxy Y iff X > Y". To support
* mirror promotion, X attempts to connect to Y even if Y is a mirror, or even
* if we have connected to Y's primary. In fact we do not know whether Y is a
* mirror or not, and we do not care.
*/
static void
ic_proxy_server_ensure_peers(uv_loop_t *loop)
{
ListCell *cell;
foreach(cell, ic_proxy_addrs)
{
ICProxyAddr *addr = lfirst(cell);
ICProxyPeer *peer;
if (addr->content >= GpIdentity.segindex)
continue;
if (addr->dbid == GpIdentity.dbid)
continue; /* do not connect to my primary / mirror */
/*
* First get the peer with the peer id, then connect to it. The peer
* can be a placeholder, can be in the progress of a connection, or can
* be connected, the ic_proxy_peer_connect() function will take care of
* the state.
*/
peer = ic_proxy_peer_blessed_lookup(loop, addr->content, addr->dbid);
ic_proxy_peer_connect(peer, (struct sockaddr_in *) addr);
}
}
/*
* Timer handler.
*
* This is used to maintain the proxy-proxy network, as well as the client and
* peer listeners.
*/
static void
ic_proxy_server_on_timer(uv_timer_t *timer)
{
ic_proxy_server_peer_listener_init(timer->loop);
ic_proxy_server_ensure_peers(timer->loop);
ic_proxy_server_client_listener_init(timer->loop);
}
/*
* Signal handler.
*
* Signals are handled via the signalfd() call in libuv, so this is a normal
* callback as others, nothing special, errors can be raised, too.
*/
static void
ic_proxy_server_on_signal(uv_signal_t *handle, int signum)
{
ic_proxy_log(WARNING, "ic-proxy-server: received signal %d", signum);
if (signum == SIGHUP)
{
ProcessConfigFile(PGC_SIGHUP);
ic_proxy_reload_addresses();
ic_proxy_server_peer_listener_init(handle->loop);
ic_proxy_server_ensure_peers(handle->loop);
ic_proxy_server_client_listener_init(handle->loop);
}
else
{
uv_stop(handle->loop);
}
}
/*
* The main loop of the ic-proxy.
*/
int
ic_proxy_server_main(void)
{
char path[MAXPGPATH];
ic_proxy_log(LOG, "ic-proxy-server: setting up");
ic_proxy_pkt_cache_init(IC_PROXY_MAX_PKT_SIZE);
ic_proxy_reload_addresses();
uv_loop_init(&ic_proxy_server_loop);
ic_proxy_router_init(&ic_proxy_server_loop);
ic_proxy_peer_table_init();
ic_proxy_client_table_init();
ic_proxy_peer_listening = false;
ic_proxy_client_listening = false;
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_hup);
uv_signal_start(&ic_proxy_server_signal_hup, ic_proxy_server_on_signal, SIGHUP);
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_int);
uv_signal_start(&ic_proxy_server_signal_hup, ic_proxy_server_on_signal, SIGINT);
/* on master */
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_term);
uv_signal_start(&ic_proxy_server_signal_term, ic_proxy_server_on_signal, SIGTERM);
/* on segments */
uv_signal_init(&ic_proxy_server_loop, &ic_proxy_server_signal_stop);
uv_signal_start(&ic_proxy_server_signal_stop, ic_proxy_server_on_signal, SIGQUIT);
/* TODO: we could stop the timer if all the peers are connected */
uv_timer_init(&ic_proxy_server_loop, &ic_proxy_server_timer);
uv_timer_start(&ic_proxy_server_timer, ic_proxy_server_on_timer, 100, 1000);
ic_proxy_log(LOG, "ic-proxy-server: running");
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/*
* return non-zero value so we are restarted by the postmaster, but this
* behavior can be controled by calling ic_proxy_server_quit()
*/
ic_proxy_server_exit_code = 1;
uv_run(&ic_proxy_server_loop, UV_RUN_DEFAULT);
uv_loop_close(&ic_proxy_server_loop);
ic_proxy_log(LOG, "ic-proxy-server: closing");
ic_proxy_client_table_uninit();
ic_proxy_peer_table_uninit();
ic_proxy_router_uninit();
ic_proxy_build_server_sock_path(path, sizeof(path));
#if 0
ic_proxy_log(LOG, "unlink(%s) ...", path);
unlink(path);
#endif
ic_proxy_pkt_cache_uninit();
ic_proxy_log(LOG, "ic-proxy-server: closed with code %d",
ic_proxy_server_exit_code);
return ic_proxy_server_exit_code;
}
void
ic_proxy_server_quit(uv_loop_t *loop, bool relaunch)
{
ic_proxy_log(LOG, "ic-proxy-server: quiting");
if (relaunch)
/* return non-zero value so we are restarted by the postmaster */
ic_proxy_server_exit_code = 1;
else
ic_proxy_server_exit_code = 0;
/*
* we can't close the loop directly, we need to properly shutdown all the
* clients first.
*/
if (ic_proxy_peer_listening)
{
uv_unref((uv_handle_t *) &ic_proxy_peer_listener);
uv_close((uv_handle_t *) &ic_proxy_peer_listener, NULL);
}
if (ic_proxy_client_listening)
{
uv_unref((uv_handle_t *) &ic_proxy_client_listener);
uv_close((uv_handle_t *) &ic_proxy_client_listener, NULL);
}
uv_timer_stop(&ic_proxy_server_timer);
uv_unref((uv_handle_t *) &ic_proxy_server_signal_hup);
uv_unref((uv_handle_t *) &ic_proxy_server_signal_term);
uv_unref((uv_handle_t *) &ic_proxy_server_signal_stop);
#if 0
uv_client_table_disconnect_all();
#endif
/*
* do not close the loop directly, it will quit automatically after all the
* clients are closed.
*/
#if 0
uv_loop_close(loop);
#endif
}
/*-------------------------------------------------------------------------
*
* ic_proxy_message.c
*
* Interconnect Proxy Packet and Message
*
* Similar to the ic-udp, in ic-proxy mode we also need to transfer the data as
* packets, the packet header contains all the necessary information to
* identify the sender, the receiver, as well the sequence (session id, command
* id, slice id).
*
* A message is a special kind of packet, it contains only the header, no
* payload.
*
* Packets and messages are all allocated from the packet cache, they must be
* freed with the ic_proxy_pkt_cache_free() function.
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "ic_proxy.h"
#include "ic_proxy_packet.h"
#include "ic_proxy_pkt_cache.h"
/*
* Get the name string of a message type.
*/
const char *
ic_proxy_message_type_to_str(ICProxyMessageType type)
{
switch (type)
{
case IC_PROXY_MESSAGE_DATA:
return "DATA";
case IC_PROXY_MESSAGE_HELLO:
return "HELLO";
case IC_PROXY_MESSAGE_HELLO_ACK:
return "HELLO ACK";
case IC_PROXY_MESSAGE_PEER_QUIT:
return "PEER QUIT";
case IC_PROXY_MESSAGE_BYE:
return "BYE";
case IC_PROXY_MESSAGE_PAUSE:
return "PAUSE";
case IC_PROXY_MESSAGE_RESUME:
return "RESUME";
default:
return "UNKNOWN";
}
}
/*
* Build a new message from the key.
*
* The returned packet must be freed with the ic_proxy_pkt_cache_free()
* function.
*/
ICProxyPkt *
ic_proxy_message_new(ICProxyMessageType type, const ICProxyKey *key)
{
ICProxyPkt *pkt = ic_proxy_pkt_cache_alloc(NULL);
ic_proxy_message_init(pkt, type, key);
return pkt;
}
/*
* Initialize a message from the key.
*
* The pkt must be large enough to contain a packet header.
*/
void
ic_proxy_message_init(ICProxyPkt *pkt, ICProxyMessageType type,
const ICProxyKey *key)
{
pkt->type = type;
pkt->len = sizeof(*pkt);
pkt->sessionId = key->sessionId;
pkt->commandId = key->commandId;
pkt->sendSliceIndex = key->sendSliceIndex;
pkt->recvSliceIndex = key->recvSliceIndex;
pkt->srcContentId = key->localContentId;
pkt->srcDbid = key->localDbid;
pkt->srcPid = key->localPid;
pkt->dstContentId = key->remoteContentId;
pkt->dstDbid = key->remoteDbid;
pkt->dstPid = key->remotePid;
}
/*
* Build a new packet.
*
* The data will also be copied to the packet.
*
* The returned packet must be freed with the ic_proxy_pkt_cache_free()
* function.
*/
ICProxyPkt *
ic_proxy_pkt_new(const ICProxyKey *key, const void *data, uint16 size)
{
ICProxyPkt *pkt;
Assert(size + sizeof(*pkt) <= IC_PROXY_MAX_PKT_SIZE);
pkt = ic_proxy_pkt_cache_alloc(NULL);
ic_proxy_message_init(pkt, IC_PROXY_MESSAGE_DATA, key);
memcpy(((char *) pkt) + sizeof(*pkt), data, size);
pkt->len = sizeof(*pkt) + size;
return pkt;
}
/*
* Duplicate a packet.
*
* The returned packet must be freed with the ic_proxy_pkt_cache_free()
* function.
*/
ICProxyPkt *
ic_proxy_pkt_dup(const ICProxyPkt *pkt)
{
ICProxyPkt *newpkt;
newpkt = ic_proxy_pkt_cache_alloc(NULL);
memcpy(newpkt, pkt, pkt->len);
return newpkt;
}
/*
* Build a describe string of a packet.
*
* The string contains all the header information, can be used in log & error
* messages.
*
* Return the string, which must not be freed. The string is in a static
* buffer, so a second call to this function will overwrite the result of the
* previous call.
*/
const char *
ic_proxy_pkt_to_str(const ICProxyPkt *pkt)
{
static char buf[256];
snprintf(buf, sizeof(buf),
"%s [con%d,cmd%d,slice[%hd->%hd] %hu bytes seg%hd:dbid%hu:p%d->seg%hd:dbid%hu:p%d]",
ic_proxy_message_type_to_str(pkt->type),
pkt->sessionId, pkt->commandId,
pkt->sendSliceIndex, pkt->recvSliceIndex,
pkt->len,
pkt->srcContentId, pkt->srcDbid, pkt->srcPid,
pkt->dstContentId, pkt->dstDbid, pkt->dstPid);
return buf;
}
/*
* Check whether a packet is from a client.
*
* The client is identified by the key.
*/
bool
ic_proxy_pkt_is_from_client(const ICProxyPkt *pkt, const ICProxyKey *key)
{
return pkt->srcDbid == key->localDbid
&& pkt->srcPid == key->localPid
&& pkt->dstDbid == key->remoteDbid
&& pkt->dstPid == key->remotePid
&& pkt->sendSliceIndex == key->sendSliceIndex
&& pkt->recvSliceIndex == key->recvSliceIndex
;
}
/*
* Check whether a packet is to a client.
*
* The client is identified by the key.
*/
bool
ic_proxy_pkt_is_to_client(const ICProxyPkt *pkt, const ICProxyKey *key)
{
return pkt->dstDbid == key->localDbid
&& pkt->dstPid == key->localPid
&& pkt->srcDbid == key->remoteDbid
&& pkt->srcPid == key->remotePid
&& pkt->sendSliceIndex == key->sendSliceIndex
&& pkt->recvSliceIndex == key->recvSliceIndex
;
}
/*
* Check whether a packet is live to a client.
*
* The client is identified by the key.
*
* A live packet has the same (sessionId, commandId) with the client.
*/
bool
ic_proxy_pkt_is_live(const ICProxyPkt *pkt, const ICProxyKey *key)
{
return pkt->sessionId == key->sessionId
&& pkt->commandId == key->commandId
;
}
/*
* Check whether a packet is out-of-date to a client.
*
* The client is identified by the key.
*
* A packet is out-of-date if
*
* pkt.(sessionId, commandId) < client.(sessionId, commandId)
*/
bool
ic_proxy_pkt_is_out_of_date(const ICProxyPkt *pkt, const ICProxyKey *key)
{
return ((pkt->sessionId < key->sessionId) ||
(pkt->sessionId == key->sessionId &&
pkt->commandId < key->commandId));
}
/*
* Check whether a packet is in the future to a client.
*
* The client is identified by the key.
*
* A packet is in the future if
*
* pkt.(sessionId, commandId) > client.(sessionId, commandId)
*/
bool
ic_proxy_pkt_is_in_the_future(const ICProxyPkt *pkt, const ICProxyKey *key)
{
return ((pkt->sessionId > key->sessionId) ||
(pkt->sessionId == key->sessionId &&
pkt->commandId > key->commandId));
}
/*-------------------------------------------------------------------------
*
* ic_proxy_packet.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_PACKET_H
#define IC_PROXY_PACKET_H
#include "postgres.h"
#include "cdb/cdbvars.h"
typedef struct ICProxyPkt ICProxyPkt;
/* we have to include it after the declaration of ICProxyPkt */
#include "ic_proxy_key.h"
#define IC_PROXY_MAX_PKT_SIZE (Gp_max_packet_size + sizeof(ICProxyPkt))
typedef enum
{
IC_PROXY_MESSAGE_DATA = 0,
/* TODO: separate peer messages and client messages */
/*
* these are common messages of peers and clients, however they have
* slightly different meanings between peers and clients, it's better to
* separate them.
* */
IC_PROXY_MESSAGE_HELLO,
IC_PROXY_MESSAGE_HELLO_ACK,
/* these are peer messages */
IC_PROXY_MESSAGE_PEER_QUIT,
/* these are client messages */
IC_PROXY_MESSAGE_BYE,
IC_PROXY_MESSAGE_PAUSE,
IC_PROXY_MESSAGE_RESUME,
} ICProxyMessageType;
struct ICProxyPkt
{
uint16 len;
uint16 type;
int16 srcContentId;
uint16 srcDbid;
int32 srcPid;
int16 dstContentId;
uint16 dstDbid;
int32 dstPid;
int32 sessionId;
uint32 commandId;
int16 sendSliceIndex;
int16 recvSliceIndex;
};
const char *ic_proxy_message_type_to_str(ICProxyMessageType type);
extern ICProxyPkt *ic_proxy_message_new(ICProxyMessageType type,
const ICProxyKey *key);
extern void ic_proxy_message_init(ICProxyPkt *pkt,
ICProxyMessageType type,
const ICProxyKey *key);
extern ICProxyPkt *ic_proxy_pkt_new(const ICProxyKey *key,
const void *data, uint16 size);
extern ICProxyPkt *ic_proxy_pkt_dup(const ICProxyPkt *pkt);
extern const char *ic_proxy_pkt_to_str(const ICProxyPkt *pkt);
extern bool ic_proxy_pkt_is_from_client(const ICProxyPkt *pkt,
const ICProxyKey *key);
extern bool ic_proxy_pkt_is_to_client(const ICProxyPkt *pkt,
const ICProxyKey *key);
extern bool ic_proxy_pkt_is_live(const ICProxyPkt *pkt,
const ICProxyKey *key);
extern bool ic_proxy_pkt_is_out_of_date(const ICProxyPkt *pkt,
const ICProxyKey *key);
extern bool ic_proxy_pkt_is_in_the_future(const ICProxyPkt *pkt,
const ICProxyKey *key);
static inline ICProxyMessageType
ic_proxy_message_get_type(const ICProxyPkt *pkt)
{
return pkt->type;
}
static inline bool
ic_proxy_pkt_is(const ICProxyPkt *pkt, ICProxyMessageType type)
{
Assert(pkt);
Assert(pkt->len >= sizeof(*pkt));
/* then check the message type on demand */
return type == pkt->type;
}
#endif /* IC_PROXY_PACKET_H */
此差异已折叠。
/*-------------------------------------------------------------------------
*
* ic_proxy_pkt_cache.c
*
* Interconnect Proxy Packet Cache
*
* Libuv needs us to allocate the packet buffer, and it does not reuse the
* buffer, so it is expansive to repeatedly allocating and freeing the packets.
*
* To make it more efficient we save all the freed packets in a free list, and
* reuse them later.
*
* All the allocated packets are of the same size, the max possible packet
* size, discarding the size requested by libuv, so the packet buffer can be
* safely reused later.
*
* TODO:
* - many libuv requests, such as uv_write(), needs us to allocate the request
* buffer, they are not reused, too, we could consider saving them in a
* free list similarly, or even share the same free list with packets;
* - we need to limit the size of the free list, currently packets are never
* freed;
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#define IC_PROXY_LOG_LEVEL WARNING
#include "ic_proxy.h"
#include "ic_proxy_pkt_cache.h"
#include <uv.h>
typedef struct ICProxyPktCache ICProxyPktCache;
/*
* A simple free list.
*/
struct ICProxyPktCache
{
ICProxyPktCache *next;
};
static struct
{
ICProxyPktCache *freelist; /* the free list */
uint32 pkt_size; /* the packet size for all the packets */
uint32 n_free; /* count of packets in the free list */
uint32 n_total; /* count of all the allocated packets */
} ic_proxy_pkt_cache;
/*
* Initialize the packet cache.
*/
void
ic_proxy_pkt_cache_init(uint32 pkt_size)
{
ic_proxy_pkt_cache.freelist = NULL;
ic_proxy_pkt_cache.pkt_size = pkt_size;
ic_proxy_pkt_cache.n_free = 0;
ic_proxy_pkt_cache.n_total = 0;
}
/*
* Cleanup the packet cache.
*/
void
ic_proxy_pkt_cache_uninit(void)
{
while (ic_proxy_pkt_cache.freelist)
{
ICProxyPktCache *cpkt = ic_proxy_pkt_cache.freelist;
ic_proxy_pkt_cache.freelist = cpkt->next;
ic_proxy_free(cpkt);
}
}
/*
* Allocate a packet from the cache.
*
* If the free list is empty a new packet is allocated and returned, otherwise
* one is detached from the free list and returned directly.
*
* If pkt_size is not NULL it is set with the actual packet buffer size.
*
* Return the packet buffer.
*/
void *
ic_proxy_pkt_cache_alloc(size_t *pkt_size)
{
ICProxyPktCache *cpkt;
if (ic_proxy_pkt_cache.freelist)
{
cpkt = ic_proxy_pkt_cache.freelist;
ic_proxy_pkt_cache.freelist = cpkt->next;
ic_proxy_pkt_cache.n_free--;
}
else
{
cpkt = ic_proxy_alloc(ic_proxy_pkt_cache.pkt_size);
ic_proxy_pkt_cache.n_total++;
}
if (pkt_size)
*pkt_size = ic_proxy_pkt_cache.pkt_size;
#if 0
/* for debug purpose */
memset(cpkt, 0, ic_proxy_pkt_cache.pkt_size);
#endif
ic_proxy_log(LOG, "pkt-cache: allocated, %d free, %d total",
ic_proxy_pkt_cache.n_free, ic_proxy_pkt_cache.n_total);
return cpkt;
}
/*
* Allocate a packet from the cache, as a libuv callback.
*
* This is a wrapper of ic_proxy_pkt_cache_alloc(), this function can be used
* as the libuv uv_alloc_cb callback.
*/
void
ic_proxy_pkt_cache_alloc_buffer(uv_handle_t *handle, size_t size, uv_buf_t *buf)
{
buf->base = ic_proxy_pkt_cache_alloc(&buf->len);
}
/*
* Return a packet to the free list.
*/
void
ic_proxy_pkt_cache_free(void *pkt)
{
ICProxyPktCache *cpkt = pkt;
#if 0
/* for debug purpose */
memset(cpkt, 0, ic_proxy_pkt_cache.pkt_size);
for (ICProxyPktCache *iter = ic_proxy_pkt_cache.freelist;
iter; iter = iter->next)
Assert(iter != cpkt);
#endif
cpkt->next = ic_proxy_pkt_cache.freelist;
ic_proxy_pkt_cache.freelist = cpkt;
ic_proxy_pkt_cache.n_free++;
ic_proxy_log(LOG, "pkt-cache: recycled, %d free, %d total",
ic_proxy_pkt_cache.n_free, ic_proxy_pkt_cache.n_total);
}
/*-------------------------------------------------------------------------
*
* ic_proxy_pkt_cache.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_PKT_CACHE_H
#define IC_PROXY_PKT_CACHE_H
#include <uv.h>
extern void ic_proxy_pkt_cache_init(uint32 pkt_size);
extern void ic_proxy_pkt_cache_uninit(void);
extern void *ic_proxy_pkt_cache_alloc(size_t *pkt_size);
extern void ic_proxy_pkt_cache_alloc_buffer(uv_handle_t *handle,
size_t size, uv_buf_t *buf);
extern void ic_proxy_pkt_cache_free(void *pkt);
#endif /* IC_PROXY_PKT_CACHE_H */
/*-------------------------------------------------------------------------
*
* ic_proxy_router.c
*
* Interconnect Proxy Router
*
* A router routes a packet to the correct target, a client or a peer.
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include "ic_proxy.h"
#include "ic_proxy_server.h"
#include "ic_proxy_router.h"
#include "ic_proxy_packet.h"
#include "ic_proxy_pkt_cache.h"
#include "ic_proxy_server.h"
typedef struct ICProxyWriteReq ICProxyWriteReq;
typedef struct ICProxyLoopback ICProxyLoopback;
/*
* A router write request.
*
* It is similar to the libuv write request, however the router will take care
* of the common part, such as the freeing of the packet and the request, so
* the caller can focus on the real business.
*/
struct ICProxyWriteReq
{
uv_write_t req; /* the libuv write request */
ic_proxy_sent_cb callback; /* the callback */
void *opaque; /* the callback data */
};
/*
* The loopback packet queue.
*
* Loopback packets can not be routed immediately, refer to ICProxyDelay for
* details. They are first put in a queue, and are actually routed in a libuv
* check callback, it is triggered after all the current I/O events are
* handled, so there will be no misordered packets, and no reentrance to some
* critical functions.
*/
struct ICProxyLoopback
{
uv_check_t check; /* the libuv check handle */
List *queue; /* List<ICProxyWriteReq *> */
};
static ICProxyLoopback ic_proxy_router_loopback;
/*
* The loopback check is triggered.
*/
static void
ic_proxy_router_loopback_on_check(uv_check_t *handle)
{
List *queue;
ListCell *cell;
/*
* Stop the check callback, all the queued loopback packets will be routed
* in this round. This must happen before routing the packets, so if new
* loopback packets are queued, the check callback can be re-turned on,
* those packets will be handled next round.
*
* TODO: the new loopback packets can be handled in this round, too.
* Queueing them to next round means it will not be triggered until some
* I/O events happen. In current logic there is the per second timer, so
* in worst case the new loopback packets are delayed for 1 second. If the
* timer is paused in the future, as an optimization, then in worst case
* the new packets may not get a chance to be routed. The only concern on
* handling them now is that if a infinite ping-pong happens, this function
* would never return to the mainloop. It's unlikely to happen, though,
* and we could prevent that by adding a round count limit.
*/
uv_check_stop(&ic_proxy_router_loopback.check);
/*
* We must detach the queue before handling it, in case some new packets
* are queued during the process
*/
queue = ic_proxy_router_loopback.queue;
ic_proxy_router_loopback.queue = NIL;
foreach(cell, queue)
{
ICProxyDelay *delay = lfirst(cell);
ICProxyClient *client;
ICProxyKey key;
/* Loopback packets are always to a loopback client */
ic_proxy_key_from_p2c_pkt(&key, delay->pkt);
client = ic_proxy_client_blessed_lookup(handle->loop, &key);
ic_proxy_log(LOG, "ic-proxy-router: looped back %s to %s",
ic_proxy_pkt_to_str(delay->pkt),
ic_proxy_client_get_name(client));
ic_proxy_client_on_p2c_data(client, delay->pkt,
delay->callback, delay->opaque);
/* do not forget to call the callback */
if (delay->callback)
delay->callback(delay->opaque, delay->pkt, 0);
/* and do not forget to free the memory */
ic_proxy_free(delay);
}
list_free(queue);
}
/*
* Push a loopback packet to the queue.
*/
static void
ic_proxy_router_loopback_push(ICProxyPkt *pkt,
ic_proxy_sent_cb callback, void *opaque)
{
ICProxyDelay *delay;
ic_proxy_log(LOG, "ic-proxy-router: looping back %s",
ic_proxy_pkt_to_str(pkt));
/*
* Enable the libuv check callback if not yet.
*/
if (ic_proxy_router_loopback.queue == NIL)
uv_check_start(&ic_proxy_router_loopback.check,
ic_proxy_router_loopback_on_check);
/*
* Loopback packets are always to a loopback client, so it's safe to pass
* NULL as the peer.
*/
delay = ic_proxy_peer_build_delay(NULL, pkt, callback, opaque);
ic_proxy_router_loopback.queue = lappend(ic_proxy_router_loopback.queue, delay);
}
/*
* Initialize the router.
*/
void
ic_proxy_router_init(uv_loop_t *loop)
{
uv_check_init(loop, &ic_proxy_router_loopback.check);
ic_proxy_router_loopback.queue = NIL;
}
/*
* Cleanup the router.
*/
void
ic_proxy_router_uninit(void)
{
List *queue;
ListCell *cell;
queue = ic_proxy_router_loopback.queue;
ic_proxy_router_loopback.queue = NIL;
uv_check_stop(&ic_proxy_router_loopback.check);
foreach(cell, queue)
{
ICProxyDelay *delay = lfirst(cell);
/*
* TODO: this function is only called on exiting, so it's better to
* drop the callbacks silently, right?
*/
#if 0
if (delay->callback)
delay->callback(delay->opaque, pkt, UV_ECANCELED);
#endif
ic_proxy_pkt_cache_free(delay->pkt);
ic_proxy_free(delay);
}
list_free(queue);
}
/*
* Route a packet.
*/
void
ic_proxy_router_route(uv_loop_t *loop, ICProxyPkt *pkt,
ic_proxy_sent_cb callback, void *opaque)
{
if (pkt->dstDbid == pkt->srcDbid)
{
/*
* For a loopback target, we do not need to send the packet via a peer,
* we could pass the packet to the target client via
* ic_proxy_client_on_p2c_data(), however that function is not
* reentrantable, which happens on PAUSE & RESUME messages, so we must
* schedule it in a libuv check callback.
*
* TODO: when callback is NULL, we could pass the packet immediately.
*/
ic_proxy_router_loopback_push(pkt, callback, opaque);
}
else if (pkt->dstDbid == GpIdentity.dbid)
{
ICProxyClient *client;
ICProxyKey key;
ic_proxy_key_from_p2c_pkt(&key, pkt);
client = ic_proxy_client_blessed_lookup(loop, &key);
ic_proxy_log(LOG, "ic-proxy-router: routing %s to %s",
ic_proxy_pkt_to_str(pkt),
ic_proxy_client_get_name(client));
ic_proxy_client_on_p2c_data(client, pkt, callback, opaque);
}
else
{
ICProxyPeer *peer;
peer = ic_proxy_peer_blessed_lookup(loop,
pkt->dstContentId, pkt->dstDbid);
ic_proxy_log(LOG, "ic-proxy-router: routing %s to %s",
ic_proxy_pkt_to_str(pkt), peer->name);
ic_proxy_peer_route_data(peer, pkt, callback, opaque);
}
}
/*
* The packet is written.
*/
static void
ic_proxy_router_on_write(uv_write_t *req, int status)
{
ICProxyWriteReq *wreq = (ICProxyWriteReq *) req;
ICProxyPkt *pkt = uv_req_get_data((uv_req_t *) req);
if (status < 0)
ic_proxy_log(LOG, "ic-proxy-router: fail to send %s: %s",
ic_proxy_pkt_to_str(pkt), uv_strerror(status));
else
ic_proxy_log(LOG, "ic-proxy-router: sent %s",
ic_proxy_pkt_to_str(pkt));
if (wreq->callback)
wreq->callback(wreq->opaque, pkt, status);
ic_proxy_pkt_cache_free(pkt);
ic_proxy_free(req);
}
/*
* Write a packet to a libuv stream.
*
* This is a simple wrapper for the uv_write() function. The boring parts,
* like buffer & request management, are handled by this wrapper, so the caller
* can focus on the real business.
*
* It can write the packet at a specific offset, this is useful when writing
* data from the client to the backend, the backend wants headless data, so the
* client can specify sizeof(ICProxyPkt) as the offset.
*
* - stream: the target stream, usually a peer or a client;
* - pkt: the data to write, the ownership is taken;
* - offset: the data offset to write from, usually 0 when writing to a peer,
* or sizeof(ICProxyPkt) when writing to a client;
* - callback: the callback function;
* - opaque: the callback data;
*/
void
ic_proxy_router_write(uv_stream_t *stream, ICProxyPkt *pkt, int32 offset,
ic_proxy_sent_cb callback, void *opaque)
{
ICProxyWriteReq *wreq;
uv_buf_t wbuf;
ic_proxy_log(LOG, "ic-proxy-router: sending %s", ic_proxy_pkt_to_str(pkt));
wreq = ic_proxy_new(ICProxyWriteReq);
uv_req_set_data((uv_req_t *) wreq, pkt);
wreq->callback = callback;
wreq->opaque = opaque;
wbuf.base = ((char *) pkt) + offset;
wbuf.len = pkt->len - offset;
uv_write(&wreq->req, stream, &wbuf, 1, ic_proxy_router_on_write);
}
/*-------------------------------------------------------------------------
*
* ic_proxy_router.h
*
*
* Copyright (c) 2020-Present Pivotal Software, Inc.
*
*
*-------------------------------------------------------------------------
*/
#ifndef IC_PROXY_ROUTER_H
#define IC_PROXY_ROUTER_H
#include "postgres.h"
#include <uv.h>
typedef void (* ic_proxy_sent_cb) (void *opaque,
const ICProxyPkt *pkt, int status);
extern void ic_proxy_router_init(uv_loop_t *loop);
extern void ic_proxy_router_uninit(void);
extern void ic_proxy_router_route(uv_loop_t *loop, ICProxyPkt *pkt,
ic_proxy_sent_cb callback, void *opaque);
extern void ic_proxy_router_write(uv_stream_t *stream,
ICProxyPkt *pkt, int32 offset,
ic_proxy_sent_cb callback, void *opaque);
#endif /* IC_PROXY_ROUTER_H */
此差异已折叠。
...@@ -284,6 +284,8 @@ typedef enum GpVars_Interconnect_Type ...@@ -284,6 +284,8 @@ typedef enum GpVars_Interconnect_Type
extern int Gp_interconnect_type; extern int Gp_interconnect_type;
extern char *gp_interconnect_proxy_addresses;
typedef enum GpVars_Interconnect_Method typedef enum GpVars_Interconnect_Method
{ {
INTERCONNECT_FC_METHOD_CAPACITY = 0, INTERCONNECT_FC_METHOD_CAPACITY = 0,
......
...@@ -378,6 +378,9 @@ ...@@ -378,6 +378,9 @@
/* Define to 1 if you have the `ssl' library (-lssl). */ /* Define to 1 if you have the `ssl' library (-lssl). */
#undef HAVE_LIBSSL #undef HAVE_LIBSSL
/* Define to 1 if you have the `uv' library (-luv). */
#undef HAVE_LIBUV
/* Define to 1 if you have the `wldap32' library (-lwldap32). */ /* Define to 1 if you have the `wldap32' library (-lwldap32). */
#undef HAVE_LIBWLDAP32 #undef HAVE_LIBWLDAP32
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册