提交 adcce517 编写于 作者: V Vladsz83 提交者: ymolochkov

IGNITE-13705 : Another node fails with failure of target node. (#8484)

(cherry picked from commit edb736dc)
上级 5c8f1d16
......@@ -214,6 +214,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Interval of checking connection to next node in the ring. */
private long connCheckInterval;
/** Fundamental value for connection checking actions. */
private long connCheckTick;
/** */
private IgniteThreadPoolExecutor utilityPool;
......@@ -385,9 +388,12 @@ class ServerImpl extends TcpDiscoveryImpl {
lastRingMsgSentTime = 0;
// Foundumental timeout value for actions related to connection check.
connCheckTick = effectiveExchangeTimeout() / 3;
// Since we take in account time of last sent message, the interval should be quite short to give enough piece
// of failure detection timeout as send-and-acknowledge timeout of the message to send.
connCheckInterval = Math.min(effectiveExchangeTimeout() / 4, MAX_CON_CHECK_INTERVAL);
connCheckInterval = Math.min(connCheckTick, MAX_CON_CHECK_INTERVAL);
utilityPool = new IgniteThreadPoolExecutor("disco-pool",
spi.ignite().name(),
......@@ -3510,12 +3516,19 @@ class ServerImpl extends TcpDiscoveryImpl {
if (changeTop)
hndMsg.changeTopology(ring.previousNodeOf(next).id());
if (log.isDebugEnabled())
log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState + ']');
if (log.isDebugEnabled()) {
log.debug("Sending handshake [hndMsg=" + hndMsg + ", sndState=" + sndState +
"] with timeout " + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
}
spi.writeToSocket(sock, out, hndMsg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
if (log.isDebugEnabled()) {
log.debug("Reading handshake response with timeout " +
timeoutHelper.nextTimeoutChunk(ackTimeout0));
}
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
......@@ -6526,6 +6539,26 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
/**
* Creates proper timeout helper taking in account current send state and ring state.
*
* @param sndState Current connection recovering state. Ignored if {@code null}.
* @param lastOperationNanos Time of last related operation. Ignored if negative or 0.
* @return Timeout helper.
*/
private IgniteSpiOperationTimeoutHelper serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
long lastOperationNanos) {
long absoluteThreshold = -1;
// Active send-state means we lost connection to next node and have to find another. We don't know how many
// nodes failed. May be several failed in a row. But we got only one connectionRecoveryTimeout to establish new
// connection. We should travers rest of the cluster with sliced timeout for each node.
if (sndState != null)
absoluteThreshold = Math.min(sndState.failTimeNanos, System.nanoTime() + U.millisToNanos(connCheckTick));
return new IgniteSpiOperationTimeoutHelper(spi, true, lastOperationNanos, absoluteThreshold);
}
/** Fixates time of last sent message. */
private void updateLastSentMessageTime() {
lastRingMsgSentTime = System.nanoTime();
......@@ -6887,13 +6920,22 @@ class ServerImpl extends TcpDiscoveryImpl {
(req.checkPreviousNodeId() == null || previous.id().equals(req.checkPreviousNodeId()))) {
Collection<InetSocketAddress> nodeAddrs = spi.getNodeAddresses(previous, false);
liveAddr = checkConnection(new ArrayList<>(nodeAddrs),
(int)U.nanosToMillis(timeThreshold - now));
// The connection recovery connection to one node is connCheckTick.
// We need to suppose network delays. So we use half of this time.
int backwardCheckTimeout = (int)(connCheckTick / 2);
if (log.isDebugEnabled()) {
log.debug("Remote node requests topology change. Checking connection to " +
"previous [" + previous + "] with timeout " + backwardCheckTimeout);
}
liveAddr = checkConnection(new ArrayList<>(nodeAddrs), backwardCheckTimeout);
if (log.isInfoEnabled())
log.info("Connection check done [liveAddr=" + liveAddr
+ ", previousNode=" + previous + ", addressesToCheck=" + nodeAddrs
+ ", connectingNodeId=" + nodeId + ']');
if (log.isInfoEnabled()) {
log.info("Connection check to previous node done: [liveAddr=" + liveAddr
+ ", previousNode=" + U.toShortString(previous) + ", addressesToCheck=" +
nodeAddrs + ", connectingNodeId=" + nodeId + ']');
}
}
// If local node was able to connect to previous, confirm that it's alive.
......@@ -6912,6 +6954,11 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
if (log.isDebugEnabled()) {
log.debug("Sending handshake response [" + res + "] with timeout " +
spi.getEffectiveSocketTimeout(srvSock) + " to " + rmtAddr + ":" + sock.getPort());
}
spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
......
......@@ -21,7 +21,11 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.ignite.Ignite;
......@@ -205,6 +209,74 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest {
failedNodes.isEmpty());
}
/**
* Ensures sequential failure of two nodes has no additional issues.
*/
@Test
public void testSequentialFailTwoNodes() throws Exception {
simulateFailureOfTwoNodes(true);
}
/**
* Ensures sequential failure of two nodes has no additional issues.
*/
@Test
public void testNotSequentialFailTwoNodes() throws Exception {
simulateFailureOfTwoNodes(false);
}
/** */
private void simulateFailureOfTwoNodes(boolean sequentionally) throws Exception {
failureDetectionTimeout = 1000;
int gridCnt = 7;
startGrids(gridCnt);
awaitPartitionMapExchange();
final CountDownLatch failLatch = new CountDownLatch(2);
for (int i = 0; i < gridCnt; i++) {
ignite(i).events().localListen(evt -> {
failLatch.countDown();
return true;
}, EVT_NODE_FAILED);
int nodeIdx = i;
ignite(i).events().localListen(evt -> {
segmentedNodes.add(nodeIdx);
return true;
}, EVT_NODE_SEGMENTED);
}
Set<Integer> failedNodes = new HashSet<>();
failedNodes.add(2);
if (sequentionally)
failedNodes.add(3);
else
failedNodes.add(4);
failedNodes.forEach(idx -> processNetworkThreads(ignite(idx), Thread::suspend));
try {
failLatch.await(10, TimeUnit.SECONDS);
}
finally {
failedNodes.forEach(idx -> processNetworkThreads(ignite(idx), Thread::resume));
}
for (int i = 0; i < gridCnt; i++) {
if (!failedNodes.contains(i))
assertFalse(segmentedNodes.contains(i));
}
}
/**
* @param ig Ignite instance to get failedNodes collection from.
*/
......@@ -244,9 +316,9 @@ public class TcpDiscoveryNetworkIssuesTest extends GridCommonAbstractTest {
CommunicationSpi<?> comm = ignite.configuration().getCommunicationSpi();
GridNioServer<?> nioServerWrapper = U.field(comm, "nioSrvr");
GridNioServer<?> gridNioServer = U.field(comm, "nioSrvr");
for (GridWorker worker : nioServerWrapper.workers())
for (GridWorker worker : gridNioServer.workers())
proc.accept(worker.runner());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册