diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java index 6878d5a8b1f4b39dfc05c9837a13bf45fd833872..021f10d44480065e08012124e90a654733061090 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; + import org.slf4j.LoggerFactory; import java.util.ArrayList; @@ -123,7 +125,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { } private void initTaskParameters() { - this.taskInstance.setLogPath(getTaskLogPath(taskInstance)); + this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance)); this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java index cfe885cbfaffce322d6b3399c43034148b4cd9be..319afedd7b424106b291cf43dbae170ffeaec205 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.runner; -import com.fasterxml.jackson.annotation.JsonFormat; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -24,16 +26,22 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.DependentExecute; -import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonFormat; public class DependentTaskExecThread extends MasterBaseTaskExecThread { @@ -172,7 +180,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { } private void initTaskParameters() { - taskInstance.setLogPath(getTaskLogPath(taskInstance)); + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance)); taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index cf5359d57957ce504e33aa0ff538bf72aa358e35..ea3ad1995018ea98c8b93d0ce94dd8790af6460f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -14,17 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -36,10 +35,6 @@ import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; - - /** * master task exec base class */ @@ -85,11 +80,13 @@ public class MasterBaseTaskExecThread implements Callable { * taskUpdateQueue */ private TaskPriorityQueue taskUpdateQueue; + /** * constructor of MasterBaseTaskExecThread - * @param taskInstance task instance + * + * @param taskInstance task instance */ - public MasterBaseTaskExecThread(TaskInstance taskInstance){ + public MasterBaseTaskExecThread(TaskInstance taskInstance) { this.processService = SpringApplicationContext.getBean(ProcessService.class); this.alertDao = SpringApplicationContext.getBean(AlertDao.class); this.cancel = false; @@ -100,24 +97,26 @@ public class MasterBaseTaskExecThread implements Callable { /** * get task instance + * * @return TaskInstance */ - public TaskInstance getTaskInstance(){ + public TaskInstance getTaskInstance() { return this.taskInstance; } /** * kill master base task exec thread */ - public void kill(){ + public void kill() { this.cancel = true; } /** * submit master base task exec thread + * * @return TaskInstance */ - protected TaskInstance submit(){ + protected TaskInstance submit() { Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes(); Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); @@ -156,14 +155,13 @@ public class MasterBaseTaskExecThread implements Callable { } - /** * dispatcht task + * * @param taskInstance taskInstance * @return whether submit task success */ public Boolean dispatchTask(TaskInstance taskInstance) { - try{ if(taskInstance.isConditionsTask() || taskInstance.isDependTask() @@ -202,7 +200,7 @@ public class MasterBaseTaskExecThread implements Callable { /** - * buildTaskPriorityInfo + * buildTaskPriorityInfo * * @param processInstancePriority processInstancePriority * @param processInstanceId processInstanceId @@ -215,7 +213,7 @@ public class MasterBaseTaskExecThread implements Callable { int processInstanceId, int taskInstancePriority, int taskInstanceId, - String workerGroup){ + String workerGroup) { return processInstancePriority + UNDERLINE + processInstanceId + @@ -229,14 +227,16 @@ public class MasterBaseTaskExecThread implements Callable { /** * submit wait complete + * * @return true */ - protected Boolean submitWaitComplete(){ + protected Boolean submitWaitComplete() { return true; } /** * call + * * @return boolean * @throws Exception exception */ @@ -246,34 +246,4 @@ public class MasterBaseTaskExecThread implements Callable { return submitWaitComplete(); } - /** - * get task log path - * @return log path - */ - public String getTaskLogPath(TaskInstance task) { - String logPath; - try{ - String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) - .getLogger("ROOT") - .getAppender("TASKLOGFILE")) - .getDiscriminator()).getLogBase(); - if (baseLog.startsWith(Constants.SINGLE_SLASH)){ - logPath = baseLog + Constants.SINGLE_SLASH + - task.getProcessDefinitionId() + Constants.SINGLE_SLASH + - task.getProcessInstanceId() + Constants.SINGLE_SLASH + - task.getId() + ".log"; - }else{ - logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + - baseLog + Constants.SINGLE_SLASH + - task.getProcessDefinitionId() + Constants.SINGLE_SLASH + - task.getProcessInstanceId() + Constants.SINGLE_SLASH + - task.getId() + ".log"; - } - }catch (Exception e){ - logger.error("logger", e); - logPath = ""; - } - return logPath; - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..93008b9d643c9198f49864c48271dac933490783 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +import javax.transaction.NotSupportedException; + +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.sift.SiftingAppender; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.AppenderAttachable; + +public class LogUtils { + + private LogUtils() throws NotSupportedException { + throw new NotSupportedException(); + } + + /** + * get task log path + */ + @SuppressWarnings("unchecked") + private static String getTaskLogPath(int processDefinitionId, int processInstanceId, int taskInstanceId) { + // Optional.map will be skipped if null + return Optional.of(LoggerFactory.getILoggerFactory()) + .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) + .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) + .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) + .map(TaskLogDiscriminator::getLogBase) + .map(e -> Paths.get(e) + .toAbsolutePath() + .resolve(String.valueOf(processDefinitionId)) + .resolve(String.valueOf(processInstanceId)) + .resolve(taskInstanceId + ".log")) + .map(Path::toString) + .orElse(""); + } + + /** + * get task log path by TaskInstance + */ + public static String getTaskLogPath(TaskInstance taskInstance) { + return getTaskLogPath(taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId()); + } + + /** + * get task log path by TaskExecutionContext + */ + public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) { + return getTaskLogPath(taskExecutionContext.getProcessId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 2dc9bb2ce9f7d96aab37d5ed473632e251d60b93..3717ce37ae51a28357c22a67d37e85b871aba55e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; - -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -36,7 +34,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; +import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -51,8 +49,6 @@ import org.slf4j.LoggerFactory; import com.github.rholder.retry.RetryException; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; import io.netty.channel.Channel; /** @@ -154,28 +150,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } } - /** - * get task log path - * @return log path - */ - private String getTaskLogPath(TaskExecutionContext taskExecutionContext) { - String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) - .getLogger("ROOT") - .getAppender("TASKLOGFILE")) - .getDiscriminator()).getLogBase(); - if (baseLog.startsWith(Constants.SINGLE_SLASH)){ - return baseLog + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskExecutionContext.getTaskInstanceId() + ".log"; - } - return System.getProperty("user.dir") + Constants.SINGLE_SLASH + - baseLog + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskExecutionContext.getTaskInstanceId() + ".log"; - } - /** * build ack command * @param taskExecutionContext taskExecutionContext @@ -185,7 +159,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); - ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); + ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setStartTime(taskExecutionContext.getStartTime()); if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index f29691e9bb13b85ec0dded58b8f4971a9b042dc9..f6fdfaab632412f3674a7bc10d1f5bb7b475e99f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -17,14 +17,15 @@ package org.apache.dolphinscheduler.server.master.runner; -import java.util.HashSet; -import java.util.Set; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.HashSet; +import java.util.Set; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..02cca399ed6979261ba1e879a1d2fe35b05a63d1 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; + +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.sift.SiftingAppender; + +@RunWith(MockitoJUnitRunner.class) +public class LogUtilsTest { + + @Test + public void testGetTaskLogPath() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(100); + taskInstance.setId(1000); + + Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT"); + Assert.assertNotNull(rootLogger); + + SiftingAppender appender = Mockito.mock(SiftingAppender.class); + // it's a trick to mock logger.getAppend("TASKLOGFILE") + Mockito.when(appender.getName()).thenReturn("TASKLOGFILE"); + rootLogger.addAppender(appender); + + Path logBase = Paths.get("path").resolve("to").resolve("test"); + + TaskLogDiscriminator taskLogDiscriminator = Mockito.mock(TaskLogDiscriminator.class); + Mockito.when(taskLogDiscriminator.getLogBase()).thenReturn(logBase.toString()); + Mockito.when(appender.getDiscriminator()).thenReturn(taskLogDiscriminator); + + Path logPath = Paths.get(".").toAbsolutePath().getParent() + .resolve(logBase) + .resolve("1").resolve("100").resolve("1000.log"); + Assert.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskInstance)); + } + +} diff --git a/pom.xml b/pom.xml index ef57a3d1809d72d9e7e6eeb97ab3c1948ec2f094..0321c1a24c628bb8aa37fbbd9a562d86fd1f0cb0 100644 --- a/pom.xml +++ b/pom.xml @@ -833,6 +833,7 @@ **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java + **/server/utils/LogUtilsTest.java **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java