提交 79ff4e62 编写于 作者: N Ning Yu

ic-proxy: handle early coming BYE correctly

In a query that contains multiple init/sub plans, the packets of the
second subplan might be received while the first is still being
processed in the ic-proxy mode, this is because in ic-proxy mode a local
host handshake is used instead of the global one.

To distinguish the packets of different subplans, especially for the
early coming ones, we must stop handling on the BYE immediately, and
pass any unhandled early coming pkts to the successor or the
placeholder.

This fixes the random hanging during the ICW parallel group of
qp_functions_in_from.  No new test is added.
Co-authored-by: NHubert Zhang <hzhang@pivotal.io>
Co-authored-by: NNing Yu <nyu@pivotal.io>
上级 cf23db49
...@@ -342,6 +342,11 @@ ic_proxy_client_register(ICProxyClient *client) ...@@ -342,6 +342,11 @@ ic_proxy_client_register(ICProxyClient *client)
* Unregister a client. * Unregister a client.
* *
* If it has a successor, trigger its registration. * If it has a successor, trigger its registration.
*
* If there are still unhandled pkts, transfer them to the successor or the
* placeholder. This happens on loopback connections, due to the limitation of
* the delayed delivery, pkts can be pushed to a p2c shutting/shutted client,
* these pkts belong to the next subplan in the same query.
*/ */
static void static void
ic_proxy_client_unregister(ICProxyClient *client) ic_proxy_client_unregister(ICProxyClient *client)
...@@ -364,6 +369,16 @@ ic_proxy_client_unregister(ICProxyClient *client) ...@@ -364,6 +369,16 @@ ic_proxy_client_unregister(ICProxyClient *client)
successor = client->successor; successor = client->successor;
client->successor = NULL; client->successor = NULL;
if (client->pkts)
{
ic_proxy_log(LOG, "%s: transfer %d unhandled pkts to my successor",
ic_proxy_client_get_name(client),
list_length(client->pkts));
ic_proxy_client_cache_p2c_pkts(successor, client->pkts);
client->pkts = NIL;
}
ic_proxy_log(LOG, "%s: re-register my successor", ic_proxy_log(LOG, "%s: re-register my successor",
ic_proxy_client_get_name(client)); ic_proxy_client_get_name(client));
...@@ -377,6 +392,22 @@ ic_proxy_client_unregister(ICProxyClient *client) ...@@ -377,6 +392,22 @@ ic_proxy_client_unregister(ICProxyClient *client)
ic_proxy_client_maybe_start_read_data(successor); ic_proxy_client_maybe_start_read_data(successor);
} }
else if (client->pkts)
{
ICProxyClient *placeholder;
ic_proxy_log(LOG, "%s: transfer %d unhandled pkts to my placeholder",
ic_proxy_client_get_name(client),
list_length(client->pkts));
placeholder = ic_proxy_client_new(client->pipe.loop,
true /* placeholder */);
placeholder->key = client->key;
ic_proxy_client_register(placeholder);
ic_proxy_client_cache_p2c_pkts(placeholder, client->pkts);
client->pkts = NIL;
}
} }
/* /*
...@@ -553,16 +584,30 @@ ic_proxy_client_maybe_start_read_data(ICProxyClient *client) ...@@ -553,16 +584,30 @@ ic_proxy_client_maybe_start_read_data(ICProxyClient *client)
ic_proxy_message_init(ic_proxy_obuf_ensure_buffer(&client->obuf), ic_proxy_message_init(ic_proxy_obuf_ensure_buffer(&client->obuf),
0, &client->key); 0, &client->key);
#ifdef USE_ASSERT_CHECKING
/* /*
* some pkts might arrived before the client, handle them before reading * this should never happen because no p2c pkt is handled yet, the early
* new pkts. * coming pkts are in the cache list, we are just going to handle them.
*/ */
ic_proxy_client_handle_p2c_cache(client); if (client->state &
(IC_PROXY_CLIENT_STATE_C2P_SHUTTING |
IC_PROXY_CLIENT_STATE_P2C_SHUTTING))
{
ic_proxy_log(WARNING,
"%s: state=0x%08x: unexpected shutting down in progress",
ic_proxy_client_get_name(client), client->state);
return;
}
#endif /* USE_ASSERT_CHECKING */
/* /*
* now it's time to receive the normal data * now it's time to receive the normal data, it is important to start the
* reading before handling the cached pkts, so if the cached pkts contain
* the BYE we could stop the reading in time.
*/ */
ic_proxy_client_read_data(client); ic_proxy_client_read_data(client);
ic_proxy_client_handle_p2c_cache(client);
} }
/* /*
...@@ -1267,8 +1312,7 @@ ic_proxy_client_drop_p2c_cache(ICProxyClient *client) ...@@ -1267,8 +1312,7 @@ ic_proxy_client_drop_p2c_cache(ICProxyClient *client)
static void static void
ic_proxy_client_handle_p2c_cache(ICProxyClient *client) ic_proxy_client_handle_p2c_cache(ICProxyClient *client)
{ {
List *pkts; int count = 0;
ListCell *cell;
/* A placeholder does not handle any packets */ /* A placeholder does not handle any packets */
if (client->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER) if (client->state & IC_PROXY_CLIENT_STATE_PLACEHOLDER)
...@@ -1280,28 +1324,29 @@ ic_proxy_client_handle_p2c_cache(ICProxyClient *client) ...@@ -1280,28 +1324,29 @@ ic_proxy_client_handle_p2c_cache(ICProxyClient *client)
ic_proxy_log(LOG, "%s: trying to consume the %d cached pkts", ic_proxy_log(LOG, "%s: trying to consume the %d cached pkts",
ic_proxy_client_get_name(client), list_length(client->pkts)); ic_proxy_client_get_name(client), list_length(client->pkts));
/* First detach all the pkts */ /*
pkts = client->pkts; * Consume the pkts one by one, stop immediately if the client begin to
client->pkts = NIL; * shutdown in the p2c direction.
*/
/* Then re-handle them one by one */ while (client->pkts != NIL &&
foreach(cell, pkts) !(client->state & IC_PROXY_CLIENT_STATE_P2C_SHUTTING))
{ {
ICProxyPkt *pkt = lfirst(cell); ICProxyPkt *pkt = linitial(client->pkts);
count++;
/*
* The pkt's ownership is taken by ic_proxy_client_on_p2c_data(), so
* only need to free the list cell itself.
*/
client->pkts = list_delete_first(client->pkts);
/* FIXME: callback */ /* FIXME: callback */
ic_proxy_client_on_p2c_data(client, pkt, NULL, NULL); ic_proxy_client_on_p2c_data(client, pkt, NULL, NULL);
} }
ic_proxy_log(LOG, "%s: consumed %d cached pkts", ic_proxy_log(LOG, "%s: consumed %d cached pkts",
ic_proxy_client_get_name(client), ic_proxy_client_get_name(client), count);
list_length(pkts) - list_length(client->pkts));
/*
* the pkts ownership were transfered during ic_proxy_client_on_p2c_data(),
* only need to free the list itself.
*/
list_free(pkts);
} }
/* /*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册