提交 81603499 编写于 作者: A Andrey Zagrebin 提交者: Stephan Ewen

[FLINK-20118][file connector] Introduce TaskManager and JobManager failures in...

[FLINK-20118][file connector] Introduce TaskManager and JobManager failures in FileSourceTextLinesITCase

This closes #14199
上级 767fe0a9
......@@ -18,18 +18,33 @@
package org.apache.flink.connector.file.src;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
......@@ -42,9 +57,15 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import static org.hamcrest.Matchers.equalTo;
......@@ -61,13 +82,57 @@ public class FileSourceTextLinesITCase extends TestLogger {
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.build());
private static TestingMiniCluster miniCluster;
private static TestingEmbeddedHaServices highAvailabilityServices;
private static CompletedCheckpointStore checkpointStore;
@BeforeClass
public static void setupMiniCluster() throws Exception {
highAvailabilityServices = new HaServices(TestingUtils.defaultExecutor(),
() -> checkpointStore,
new StandaloneCheckpointIDCounter());
final Configuration configuration = createConfiguration();
miniCluster = new TestingMiniCluster(
new TestingMiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.build(),
() -> highAvailabilityServices);
miniCluster.start();
}
private static Configuration createConfiguration() throws IOException {
final Configuration configuration = new Configuration();
final String checkPointDir = Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkPointDir);
return configuration;
}
@Before
public void setup() {
checkpointStore = new RecoverableCompletedCheckpointStore();
}
@AfterClass
public static void shutdownMiniCluster() throws Exception {
if (miniCluster != null) {
miniCluster.close();
}
if (highAvailabilityServices != null) {
highAvailabilityServices.closeAndCleanupAllData();
highAvailabilityServices = null;
}
}
// ------------------------------------------------------------------------
// test cases
// ------------------------------------------------------------------------
/**
......@@ -75,6 +140,28 @@ public class FileSourceTextLinesITCase extends TestLogger {
*/
@Test
public void testBoundedTextFileSource() throws Exception {
testBoundedTextFileSource(FailoverType.NONE);
}
/**
* This test runs a job reading bounded input with a stream record format (text lines)
* and restarts TaskManager.
*/
@Test
public void testBoundedTextFileSourceWithTaskManagerFailover() throws Exception {
testBoundedTextFileSource(FailoverType.TM);
}
/**
* This test runs a job reading bounded input with a stream record format (text lines)
* and triggers JobManager failover.
*/
@Test
public void testBoundedTextFileSourceWithJobManagerFailover() throws Exception {
testBoundedTextFileSource(FailoverType.JM);
}
private void testBoundedTextFileSource(FailoverType failoverType) throws Exception {
final File testDir = TMP_FOLDER.newFolder();
// our main test data
......@@ -84,18 +171,32 @@ public class FileSourceTextLinesITCase extends TestLogger {
writeHiddenJunkFiles(testDir);
final FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
.build();
.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = new TestStreamEnvironment(miniCluster, PARALLELISM);
env.setParallelism(PARALLELISM);
final DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"file-source");
source,
WatermarkStrategy.noWatermarks(),
"file-source");
final DataStream<String> streamFailingInTheMiddleOfReading =
RecordCounterToFail.wrapWithFailureAfter(stream, LINES.length / 2);
final List<String> result = DataStreamUtils.collectBoundedStream(stream, "Bounded TextFiles Test");
final ClientAndIterator<String> client = DataStreamUtils.collectWithClient(
streamFailingInTheMiddleOfReading,
"Bounded TextFiles Test");
final JobID jobId = client.client.getJobID();
RecordCounterToFail.waitToFail();
triggerFailover(failoverType, jobId, RecordCounterToFail::continueProcessing);
final List<String> result = new ArrayList<>();
while (client.iterator.hasNext()) {
result.add(client.iterator.next());
}
verifyResult(result);
}
......@@ -106,23 +207,47 @@ public class FileSourceTextLinesITCase extends TestLogger {
*/
@Test
public void testContinuousTextFileSource() throws Exception {
testContinuousTextFileSource(FailoverType.NONE);
}
/**
* This test runs a job reading continuous input (files appearing over time)
* with a stream record format (text lines) and restarts TaskManager.
*/
@Test
public void testContinuousTextFileSourceWithTaskManagerFailover() throws Exception {
testContinuousTextFileSource(FailoverType.TM);
}
/**
* This test runs a job reading continuous input (files appearing over time)
* with a stream record format (text lines) and triggers JobManager failover.
*/
@Test
public void testContinuousTextFileSourceWithJobManagerFailover() throws Exception {
testContinuousTextFileSource(FailoverType.JM);
}
private void testContinuousTextFileSource(FailoverType type) throws Exception {
final File testDir = TMP_FOLDER.newFolder();
final FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
.monitorContinuously(Duration.ofMillis(5))
.build();
.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
.monitorContinuously(Duration.ofMillis(5))
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = new TestStreamEnvironment(miniCluster, PARALLELISM);
env.setParallelism(PARALLELISM);
env.enableCheckpointing(10L);
final DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"file-source");
source,
WatermarkStrategy.noWatermarks(),
"file-source");
final ClientAndIterator<String> client =
DataStreamUtils.collectWithClient(stream, "Continuous TextFiles Monitoring Test");
final JobID jobId = client.client.getJobID();
// write one file, execute, and wait for its result
// that way we know that the application was running and the source has
......@@ -138,9 +263,15 @@ public class FileSourceTextLinesITCase extends TestLogger {
for (int i = 1; i < LINES_PER_FILE.length; i++) {
Thread.sleep(10);
writeFile(testDir, i);
final boolean failAfterHalfOfInput = i == LINES_PER_FILE.length / 2;
if (failAfterHalfOfInput) {
triggerFailover(type, jobId, () -> {});
}
}
final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesAfter);
final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream(
client,
numLinesAfter);
// shut down the job, now that we have all the results we expected.
client.client.cancel().get();
......@@ -149,6 +280,45 @@ public class FileSourceTextLinesITCase extends TestLogger {
verifyResult(result1);
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
private enum FailoverType {
NONE,
TM,
JM
}
private static void triggerFailover(
FailoverType type,
JobID jobId,
Runnable afterFailAction) throws Exception {
switch (type) {
case NONE:
afterFailAction.run();
break;
case TM:
restartTaskManager(afterFailAction);
break;
case JM:
triggerJobManagerFailover(jobId, afterFailAction);
break;
}
}
private static void triggerJobManagerFailover(JobID jobId, Runnable afterFailAction) throws Exception {
highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
highAvailabilityServices.grantJobMasterLeadership(jobId).get();
}
private static void restartTaskManager(Runnable afterFailAction) throws Exception {
miniCluster.terminateTaskExecutor(0).get();
afterFailAction.run();
miniCluster.startTaskExecutor();
}
// ------------------------------------------------------------------------
// verification
// ------------------------------------------------------------------------
......@@ -308,4 +478,87 @@ public class FileSourceTextLinesITCase extends TestLogger {
assertTrue(stagingFile.renameTo(file));
}
// ------------------------------------------------------------------------
// mini cluster failover utilities
// ------------------------------------------------------------------------
private static class HaServices extends TestingEmbeddedHaServices {
private final Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory;
private final CheckpointIDCounter checkpointIDCounter;
private HaServices(
Executor executor,
Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory,
CheckpointIDCounter checkpointIDCounter) {
super(executor);
this.completedCheckpointStoreFactory = completedCheckpointStoreFactory;
this.checkpointIDCounter = checkpointIDCounter;
}
@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return new CheckpointRecoveryFactoryWithSettableStore(
completedCheckpointStoreFactory,
checkpointIDCounter);
}
}
private static class CheckpointRecoveryFactoryWithSettableStore implements CheckpointRecoveryFactory {
private final Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory;
private final CheckpointIDCounter checkpointIDCounter;
private CheckpointRecoveryFactoryWithSettableStore(
Supplier<CompletedCheckpointStore> completedCheckpointStoreFactory,
CheckpointIDCounter checkpointIDCounter) {
this.completedCheckpointStoreFactory = completedCheckpointStoreFactory;
this.checkpointIDCounter = checkpointIDCounter;
}
@Override
public CompletedCheckpointStore createCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
ClassLoader userClassLoader) {
return completedCheckpointStoreFactory.get();
}
@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
return checkpointIDCounter;
}
}
private static class RecordCounterToFail {
private static AtomicInteger records;
private static CompletableFuture<Void> fail;
private static CompletableFuture<Void> continueProcessing;
private static <T> DataStream<T> wrapWithFailureAfter(
DataStream<T> stream,
int failAfter) {
records = new AtomicInteger();
fail = new CompletableFuture<>();
continueProcessing = new CompletableFuture<>();
return stream.map(record -> {
final boolean halfOfInputIsRead = records.incrementAndGet() > failAfter;
final boolean notFailedYet = !fail.isDone();
if (notFailedYet && halfOfInputIsRead) {
fail.complete(null);
continueProcessing.get();
}
return record;
});
}
private static void waitToFail() throws ExecutionException, InterruptedException {
fail.get();
}
private static void continueProcessing() {
continueProcessing.complete(null);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册