提交 644bde25 编写于 作者: P Pengzhou Tang 提交者: Tang Pengzhou

Fix a hung issue caused by gp_interconnect_id disorder

This issue is exposed when doing an experiment to remove the
special "eval_stable_functions" handling in evaluate_function(),
qp_functions_in_* test cases will get stuck sometimes and it turns
out to be a gp_interconnect_id disorder issue.

Under UDPIFC interconnect, gp_interconnect_id is used to
distinguish the executions of MPP-fied plan in the same session
and in the receiver side, packets with smaller gp_interconnect_id
is treated as 'past' packets, receiver will stop the sender to send
the packets.

The RCA of the hung is:
1. QD call InitSliceTable() to advance the gp_interconnect_id and
store it in slice table.
2. In CdbDispatchPlan->exec_make_plan_constant(), QD find some
stable function need to be simplified to const, then it executes
this function first.
3. The function contains the SQL, QD init another slice table and
advance the gp_interconnect_id again, QD dispatch the new plan and
execute it.
4. After the function is simplified to const, QD continues to dispatch
the previous plan, however, the gp_interconnect_id for it becomes the
older one. When a packet comes, if the receiver hasn't set up the
interconnect yet, the packet will be handled by handleMismatch() and
it will be treated as `past` packets and the senders will be stopped
earlier by the receiver. Later the receiver finish the setup of
interconnect, it cannot get any packets from senders and get stuck.

To resolve this, we advance the gp_interconnect_id when a plan is
really dispatched, the plan is dispatched sequentially, so the later
dispatched plan will have a higher gp_interconnect_id.

Also limit the usage of gp_interconnect_id in rx thread of UDPIFC,
we prefer to use sliceTable->ic_instance_id in main thread.
Reviewed-by: NHeikki Linnakangas <hlinnakangas@pivotal.io>
Reviewed-by: NAsim R P <apraveen@pivotal.io>
Reviewed-by: NHubert Zhang <hzhang@pivotal.io>
上级 23f8671a
......@@ -135,12 +135,6 @@ preprocess_initplans(QueryDesc *queryDesc)
* later Init plans can depend on previous ones.
*/
ExecSetParamPlan(sps, sps->planstate->ps_ExprContext, queryDesc);
/*
* We dispatched, and have returned. We may have used the
* interconnect; so let's bump the interconnect-id.
*/
queryDesc->estate->es_sliceTable->ic_instance_id = ++gp_interconnect_id;
}
}
......
......@@ -1071,6 +1071,8 @@ cdbdisp_dispatchX(QueryDesc* queryDesc,
nTotalSlices = sliceTbl->numSlices;
sliceVector = palloc0(nTotalSlices * sizeof(SliceVec));
nSlices = fillSliceVector(sliceTbl, rootIdx, sliceVector, nTotalSlices);
/* Each slice table has a unique-id. */
sliceTbl->ic_instance_id = ++gp_interconnect_id;
pQueryParms = cdbdisp_buildPlanQueryParms(queryDesc, planRequiresTxn);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
......
......@@ -72,7 +72,7 @@ static void destroy_interconnect_handle(interconnect_handle_t *h);
static interconnect_handle_t *find_interconnect_handle(ChunkTransportState *icContext);
static void
logChunkParseDetails(MotionConn *conn)
logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id)
{
struct icpkthdr *pkt;
......@@ -82,7 +82,7 @@ logChunkParseDetails(MotionConn *conn)
pkt = (struct icpkthdr *) conn->pBuff;
elog(LOG, "Interconnect parse details: pkt->len %d pkt->seq %d pkt->flags 0x%x conn->active %d conn->stopRequest %d pkt->icId %d my_icId %d",
pkt->len, pkt->seq, pkt->flags, conn->stillActive, conn->stopRequested, pkt->icId, gp_interconnect_id);
pkt->len, pkt->seq, pkt->flags, conn->stillActive, conn->stopRequested, pkt->icId, ic_instance_id);
elog(LOG, "Interconnect parse details continued: peer: srcpid %d dstpid %d recvslice %d sendslice %d srccontent %d dstcontent %d",
pkt->srcPid, pkt->dstPid, pkt->recvSliceIndex, pkt->sendSliceIndex, pkt->srcContentId, pkt->dstContentId);
......@@ -120,7 +120,7 @@ RecvTupleChunk(MotionConn *conn, ChunkTransportState *transportStates)
{
if (conn->msgSize - bytesProcessed < TUPLE_CHUNK_HEADER_SIZE)
{
logChunkParseDetails(conn);
logChunkParseDetails(conn, transportStates->sliceTable->ic_instance_id);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
......@@ -148,7 +148,7 @@ RecvTupleChunk(MotionConn *conn, ChunkTransportState *transportStates)
else
elog(LOG, "Interconnect error parsing message: no last item");
logChunkParseDetails(conn);
logChunkParseDetails(conn, transportStates->sliceTable->ic_instance_id);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
......@@ -174,7 +174,7 @@ RecvTupleChunk(MotionConn *conn, ChunkTransportState *transportStates)
*/
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
logChunkParseDetails(conn);
logChunkParseDetails(conn, transportStates->sliceTable->ic_instance_id);
ereport(ERROR,
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
......
......@@ -762,6 +762,7 @@ sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEnt
{
int bytesToSend;
int bytesSent;
SliceTable *sliceTbl = transportStates->sliceTable;
if (conn->state != mcsSendRegMsg)
{
......@@ -806,7 +807,7 @@ sendRegisterMessage(ChunkTransportState *transportStates, ChunkTransportStateEnt
regMsg->srcListenerPort = Gp_listener_port & 0x0ffff;
regMsg->srcPid = MyProcPid;
regMsg->srcSessionId = gp_session_id;
regMsg->srcCommandCount = gp_interconnect_id;
regMsg->srcCommandCount = sliceTbl->ic_instance_id;
conn->state = mcsSendRegMsg;
......@@ -873,6 +874,7 @@ readRegisterMessage(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry = NULL;
CdbProcess *cdbproc = NULL;
ListCell *lc;
SliceTable *sliceTbl = transportStates->sliceTable;
/* Get ready to receive the Register message. */
if (conn->state != mcsRecvRegMsg)
......@@ -946,7 +948,7 @@ readRegisterMessage(ChunkTransportState *transportStates,
/* get rid of old connections first */
if (msg.srcSessionId != gp_session_id ||
msg.srcCommandCount < gp_interconnect_id)
msg.srcCommandCount < sliceTbl->ic_instance_id)
{
/*
* This is an old connection, which can be safely ignored. We get this
......@@ -957,7 +959,7 @@ readRegisterMessage(ChunkTransportState *transportStates,
elog(LOG, "Received invalid, old registration message: "
"will ignore ('expected:received' session %d:%d ic-id %d:%d)",
gp_session_id, msg.srcSessionId,
gp_interconnect_id, msg.srcCommandCount);
sliceTbl->ic_instance_id, msg.srcCommandCount);
goto old_conn;
}
......@@ -1263,8 +1265,6 @@ SetupTCPInterconnect(EState *estate)
Assert(sliceTable &&
mySlice->sliceIndex == sliceTable->localSlice);
gp_interconnect_id = interconnect_context->sliceTable->ic_instance_id;
gp_set_monotonic_begin_time(&startTime);
/* now we'll do some setup for each of our Receiving Motion Nodes. */
......
......@@ -437,6 +437,14 @@ struct ICGlobalControlInfo
/* Used by main thread to ask the background thread to exit. */
uint32 shutdown;
/*
* Used by ic thread in the QE to identify the current serving ic instance
* and handle the mismatch packets. It is not used by QD because QD may have
* cursors, QD may receive packets for open the cursors with lower instance
* id, QD use cursorHistoryTable to handle packets mismatch.
*/
uint32 ic_instance_id;
};
/*
......@@ -1432,6 +1440,7 @@ InitMotionUDPIFC(int *listenerSocketFd, uint16 *listenerPort)
InitLatch(&ic_control_info.latch);
ic_control_info.shutdown = 0;
ic_control_info.threadCreated = false;
ic_control_info.ic_instance_id = 0;
old = MemoryContextSwitchTo(ic_control_info.memContext);
......@@ -1743,7 +1752,7 @@ findConnByHeader(ConnHashTable *ht, icpkthdr *hdr)
write_log("findConnByHeader: not found! (hdr->srcPid %d "
"hdr->srcContentId %d hdr->dstContentId %d hdr->dstPid %d sess(%d:%d) cmd(%d:%d)) hashcode %d",
hdr->srcPid, hdr->srcContentId, hdr->dstContentId, hdr->dstPid, hdr->sessionId,
gp_session_id, hdr->icId, gp_interconnect_id, hashcode);
gp_session_id, hdr->icId, ic_control_info.ic_instance_id, hashcode);
return NULL;
}
......@@ -2794,6 +2803,7 @@ void
setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn)
{
CdbProcess *cdbProc = conn->cdbProc;
SliceTable *sliceTbl = transportStates->sliceTable;
Assert(conn->state == mcsSetupOutgoingConnection);
Assert(conn->cdbProc);
......@@ -2926,7 +2936,7 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS
conn->conn_info.dstListenerPort = conn->cdbProc->listenerPort;
conn->conn_info.sessionId = gp_session_id;
conn->conn_info.icId = gp_interconnect_id;
conn->conn_info.icId = sliceTbl->ic_instance_id;
connAddHash(&ic_control_info.connHtab, conn);
......@@ -3035,9 +3045,24 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
pthread_mutex_lock(&ic_control_info.lock);
gp_interconnect_id = sliceTable->ic_instance_id;
Assert(sliceTable->ic_instance_id > 0);
Assert(gp_interconnect_id > 0);
if (Gp_role == GP_ROLE_DISPATCH)
{
Assert(gp_interconnect_id == sliceTable->ic_instance_id);
/*
* QD use cursorHistoryTable to handle mismatch packets, no
* need to update ic_control_info.ic_instance_id
*/
}
else
{
/*
* update ic_control_info.ic_instance_id, it is mainly used
* by rx thread to handle mismatch packets
*/
ic_control_info.ic_instance_id = sliceTable->ic_instance_id;
}
interconnect_context = palloc0(sizeof(ChunkTransportState));
......@@ -3085,11 +3110,11 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
if (distTransId != rx_control_info.lastDXatId && rx_control_info.cursorHistoryTable.count > (2 * CURSOR_IC_TABLE_SIZE))
{
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, gp_interconnect_id);
pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, gp_interconnect_id);
elog(DEBUG1, "prune cursor history table (count %d), icid %d", rx_control_info.cursorHistoryTable.count, sliceTable->ic_instance_id);
pruneCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id);
}
addCursorIcEntry(&rx_control_info.cursorHistoryTable, gp_interconnect_id, gp_command_count);
addCursorIcEntry(&rx_control_info.cursorHistoryTable, sliceTable->ic_instance_id, gp_command_count);
/* save the latest transaction id. */
rx_control_info.lastDXatId = distTransId;
......@@ -3161,7 +3186,7 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
conn->conn_info.dstPid = MyProcPid;
conn->conn_info.dstListenerPort = (Gp_listener_port >> 16) & 0x0ffff;
conn->conn_info.sessionId = gp_session_id;
conn->conn_info.icId = gp_interconnect_id;
conn->conn_info.icId = sliceTable->ic_instance_id;
conn->conn_info.flags = UDPIC_FLAGS_RECEIVER_TO_SENDER;
connAddHash(&ic_control_info.connHtab, conn);
......@@ -3212,9 +3237,9 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
ereport(DEBUG1, (errmsg("SetupUDPInterconnect will activate "
"%d incoming, %d outgoing routes for gp_interconnect_id %d. "
"%d incoming, %d outgoing routes for ic_instancce_id %d. "
"Listening on ports=%d/%d sockfd=%d.",
expectedTotalIncoming, expectedTotalOutgoing, gp_interconnect_id,
expectedTotalIncoming, expectedTotalOutgoing, sliceTable->ic_instance_id,
Gp_listener_port & 0x0ffff, (Gp_listener_port >> 16) & 0x0ffff, UDP_listenerFd)));
/*
......@@ -3613,7 +3638,7 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
"isSender %d isReceiver %d "
"snd_queue_depth %d recv_queue_depth %d Gp_max_packet_size %d "
"UNACK_QUEUE_RING_SLOTS_NUM %d TIMER_SPAN %lld DEFAULT_RTT %d "
"hasErrors %d, gp_interconnect_id %d ic_id_last_teardown %d "
"hasErrors %d, ic_instance_id %d ic_id_last_teardown %d "
"snd_buffer_pool.count %d snd_buffer_pool.maxCount %d snd_sock_bufsize %d recv_sock_bufsize %d "
"snd_pkt_count %d retransmits %d crc_errors %d"
" recv_pkt_count %d recv_ack_num %d"
......@@ -4263,6 +4288,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntr
bool shouldSendBuffers = false;
SliceTable *sliceTbl = transportStates->sliceTable;
for (;;)
{
......@@ -4314,16 +4340,12 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntr
/*
* read packet, is this the ack we want ?
*
* Here, using gp_interconnect_id is safe, since only senders get
* acks. QD (never be a sender) does not. QD may have several
* concurrent running interconnect instances.
*/
if (pkt->srcContentId == GpIdentity.segindex &&
pkt->srcPid == MyProcPid &&
pkt->srcListenerPort == ((Gp_listener_port >> 16) & 0x0ffff) &&
pkt->sessionId == gp_session_id &&
pkt->icId == gp_interconnect_id)
pkt->icId == sliceTbl->ic_instance_id)
{
/*
......@@ -4462,7 +4484,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntr
pkt->srcListenerPort, ((Gp_listener_port >> 16) & 0x0ffff),
pkt->dstListenerPort,
pkt->sessionId, gp_session_id,
pkt->icId, gp_interconnect_id);
pkt->icId, sliceTbl->ic_instance_id);
}
}
......@@ -6383,15 +6405,15 @@ rxThreadFunc(void *arg)
* c) Normal execution: from past history transactions (should not happen).
*
* For QE:
* 1) pkt->id > Gp_interconnect_id : NAK it/Do nothing
* 1) pkt->id > ic_control_info.ic_instance_id : NAK it/Do nothing
* Causes: a) Start race
* b) Before Gp_interconnect_id is assigned to correct value, an error happened.
* 2) lastTornIcId < pkt->id == Gp_interconnect_id: NAK it/Do nothing
* Causes: a) Error reported after Gp_interconnect_id is set, and connections are
* b) Before ic_control_info.ic_instance_id is assigned to correct value, an error happened.
* 2) lastTornIcId < pkt->id == ic_control_info.ic_instance_id: NAK it/Do nothing
* Causes: a) Error reported after ic_control_info.ic_instance_id is set, and connections are
* not inserted to the hashtable yet, and before teardown is called.
* 3) lastTornIcId == pkt->id == Gp_interconnect_id: ACK it (with stop)
* 3) lastTornIcId == pkt->id == ic_control_info.ic_instance_id: ACK it (with stop)
* Causes: a) Normal execution: after teardown is called on current command
* 4) pkt->id < Gp_interconnect_id: NAK it/Do nothing/ACK it.
* 4) pkt->id < ic_control_info.ic_instance_id: NAK it/Do nothing/ACK it.
* Causes: a) Should not happen.
*
*/
......@@ -6465,7 +6487,7 @@ handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
/* The QEs get to use a simple counter. */
else if (Gp_role == GP_ROLE_EXECUTE)
{
if (gp_interconnect_id >= pkt->icId)
if (ic_control_info.ic_instance_id >= pkt->icId)
{
need_ack = true;
......@@ -6480,9 +6502,11 @@ handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
need_ack = false;
}
}
else /* gp_interconnect_id < pkt->icId, from the
* future */
else
{
/*
* ic_control_info.ic_instance_id < pkt->icId, from the future
*/
if (gp_interconnect_cache_future_packets)
{
cached = cacheFuturePacket(pkt, peer, peer_len);
......@@ -6505,7 +6529,7 @@ handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
if (DEBUG1 >= log_min_messages)
write_log("ACKING PACKET WITH FLAGS: pkt->seq %d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]",
pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, gp_interconnect_id);
pkt->seq, dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, ic_control_info.ic_instance_id);
format_sockaddr(&dummyconn.peer, buf, sizeof(buf));
......@@ -6540,7 +6564,7 @@ handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
else
{
if (DEBUG1 >= log_min_messages)
write_log("dropping packet from command-id %d seq %d (my cmd %d)", pkt->icId, pkt->seq, gp_interconnect_id);
write_log("dropping packet from command-id %d seq %d (my cmd %d)", pkt->icId, pkt->seq, ic_control_info.ic_instance_id);
}
return cached;
......
......@@ -1304,9 +1304,6 @@ InitSliceTable(EState *estate, PlannedStmt *plannedstmt)
table->instrument_options = INSTRUMENT_NONE;
table->hasMotions = false;
/* Each slice table has a unique-id. */
table->ic_instance_id = ++gp_interconnect_id;
/*
* Initialize the executor slice table.
*
......
......@@ -212,7 +212,7 @@ testmode_sendto(const char *caller_name, int socket, const void *buffer,
break;
FAULT_INJECT_BACKUP_PKT();
msg->srcPid = -1; /* There is no such pid. */
msg->icId = gp_interconnect_id;
msg->icId = 0;
msg->seq = 1;
write_log("inject fault to sendto: FINC_PKT_MISMATCH");
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册