diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 7454f49ae56606050c9698d3635bda336bccc3d3..45b94d26282b10cc6bdd0db32e17937a3cad0182 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import java.util.List; +import java.util.StringJoiner; import org.slf4j.Logger; @@ -130,7 +131,11 @@ public abstract class AbstractTask { if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); } else { - logger.info(" -> {}", String.join("\n\t", logs)); + // note: if the logs is a SynchronizedList and will be modified concurrently, + // we should must use foreach to iterate the element, otherwise will throw a ConcurrentModifiedException(#issue 5528) + StringJoiner joiner = new StringJoiner("\n\t"); + logs.forEach(joiner::add); + logger.info(" -> {}", joiner); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 5787907d6046bae56641d1a76776e8ab7b8f70e9..222c35593a37d7d101f9ded8a1629c334c0e0cd1 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.List; import org.junit.Assert; import org.junit.Before; @@ -37,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; - /** * sqoop task test */ @@ -201,4 +203,25 @@ public class SqoopTaskTest { } } + @Test + public void testLogHandler() throws InterruptedException { + List list = Collections.synchronizedList(new ArrayList<>()); + Thread thread1 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + list.add("test add log"); + } + }); + Thread thread2 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + sqoopTask.logHandle(list); + } + }); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + // if no exception throw, assert true + Assert.assertTrue(true); + } + }