From 38e485373de9a7301a19f57cb53dbf4cb6744696 Mon Sep 17 00:00:00 2001 From: Yichao Yang <1048262223@qq.com> Date: Sun, 28 Jun 2020 17:43:13 +0800 Subject: [PATCH] [Feature-2925][server] Init TaskLogger in TaskExecuteProcessor (#2925) (#2965) * [Feature-2925][common] Add exitVal judge in OSUtils.exeCmd (#2925) * optimize the logger utils --- .../common/shell/AbstractShell.java | 40 ++++--- .../common/utils/FileUtils.java | 60 +++++++--- .../common/utils/LoggerUtils.java | 27 ++++- .../common/utils/OSUtils.java | 103 +++++++++++------- .../common/utils/OSUtilsTest.java | 6 +- .../server/log/TaskLogFilter.java | 9 +- .../processor/TaskExecuteProcessor.java | 42 +++++-- .../worker/runner/TaskExecuteThread.java | 36 +++--- 8 files changed, 220 insertions(+), 103 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java index f846b1974..aafdb8601 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java @@ -16,9 +16,6 @@ */ package org.apache.dolphinscheduler.common.shell; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -30,6 +27,9 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A base class for running a Unix command. @@ -128,7 +128,7 @@ public abstract class AbstractShell { /** * Run a command actual work */ - private void runCommand() throws IOException { + private void runCommand() throws IOException { ProcessBuilder builder = new ProcessBuilder(getExecString()); Timer timeOutTimer = null; ShellTimeoutTimerTask timeoutTimerTask = null; @@ -153,11 +153,11 @@ public abstract class AbstractShell { timeOutTimer.schedule(timeoutTimerTask, timeOutInterval); } final BufferedReader errReader = - new BufferedReader(new InputStreamReader(process - .getErrorStream())); - BufferedReader inReader = - new BufferedReader(new InputStreamReader(process - .getInputStream())); + new BufferedReader( + new InputStreamReader(process.getErrorStream())); + BufferedReader inReader = + new BufferedReader( + new InputStreamReader(process.getInputStream())); final StringBuilder errMsg = new StringBuilder(); // read error and input streams as this would free up the buffers @@ -177,23 +177,35 @@ public abstract class AbstractShell { } } }; + Thread inThread = new Thread() { + @Override + public void run() { + try { + parseExecResult(inReader); + } catch (IOException ioe) { + logger.warn("Error reading the in stream", ioe); + } + super.run(); + } + }; try { errThread.start(); + inThread.start(); } catch (IllegalStateException ise) { } try { // parse the output - parseExecResult(inReader); - exitCode = process.waitFor(); + exitCode = process.waitFor(); try { - // make sure that the error thread exits + // make sure that the error and in thread exits errThread.join(); + inThread.join(); } catch (InterruptedException ie) { - logger.warn("Interrupted while reading the error stream", ie); + logger.warn("Interrupted while reading the error and in stream", ie); } completed.set(true); //the timeout thread handling //taken care in finally block - if (exitCode != 0) { + if (exitCode != 0 || errMsg.length() > 0) { throw new ExitCodeException(exitCode, errMsg.toString()); } } catch (InterruptedException ie) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index bae8f7f9b..de3d42974 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -16,18 +16,32 @@ */ package org.apache.dolphinscheduler.common.utils; +import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE; +import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringReader; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.nio.charset.UnsupportedCharsetException; +import java.util.Optional; + import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.nio.charset.Charset; -import java.nio.charset.UnsupportedCharsetException; - -import static org.apache.dolphinscheduler.common.Constants.*; - /** * file utils */ @@ -36,6 +50,8 @@ public class FileUtils { public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler"); + public static final ThreadLocal taskLoggerThreadLocal = new ThreadLocal<>(); + /** * get file suffix * @@ -118,7 +134,7 @@ public class FileUtils { String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), Integer.toString(processDefineId), Integer.toString(processInstanceId)); File file = new File(fileName); - if (!file.getParentFile().exists()){ + if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -138,24 +154,40 @@ public class FileUtils { * @param userName user name * @throws IOException errors */ - public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException{ + public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException { //if work dir exists, first delete File execLocalPathFile = new File(execLocalPath); - if (execLocalPathFile.exists()){ + if (execLocalPathFile.exists()) { org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); } //create work dir org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile); - logger.info("create dir success {}" , execLocalPath); - + String mkdirLog = "create dir success " + execLocalPath; + LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog); //if not exists this user,then create - if (!OSUtils.getUserList().contains(userName)){ - OSUtils.createUser(userName); + OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get()); + try { + if (!OSUtils.getUserList().contains(userName)) { + boolean isSuccessCreateUser = OSUtils.createUser(userName); + + String infoLog; + if (isSuccessCreateUser) { + infoLog = String.format("create user name success %s", userName); + } else { + infoLog = String.format("create user name fail %s", userName); + } + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog); + } + } catch (Throwable e) { + LoggerUtils.logError(Optional.ofNullable(logger), e); + LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e); } - logger.info("create user name success {}", userName); + OSUtils.taskLoggerThreadLocal.remove(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 191df335c..e3cf652ef 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -16,14 +16,15 @@ */ package org.apache.dolphinscheduler.common.utils; -import org.apache.dolphinscheduler.common.Constants; -import org.slf4j.Logger; - import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.dolphinscheduler.common.Constants; +import org.slf4j.Logger; + /** * logger utils */ @@ -93,4 +94,24 @@ public class LoggerUtils { } return appIds; } + + public static void logError(Optional optionalLogger + , String error) { + optionalLogger.ifPresent((Logger logger) -> logger.error(error)); + } + + public static void logError(Optional optionalLogger + , Throwable e) { + optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e)); + } + + public static void logError(Optional optionalLogger + , String error, Throwable e) { + optionalLogger.ifPresent((Logger logger) -> logger.error(error, e)); + } + + public static void logInfo(Optional optionalLogger + , String info) { + optionalLogger.ifPresent((Logger logger) -> logger.info(info)); + } } \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index e3b2cc272..171a017b4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -16,16 +16,6 @@ */ package org.apache.dolphinscheduler.common.utils; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.shell.ShellExecutor; -import org.apache.commons.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import oshi.SystemInfo; -import oshi.hardware.CentralProcessor; -import oshi.hardware.GlobalMemory; -import oshi.hardware.HardwareAbstractionLayer; - import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; @@ -40,8 +30,21 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.StringTokenizer; import java.util.regex.Pattern; +import org.apache.commons.configuration.Configuration; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.shell.ShellExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import oshi.SystemInfo; +import oshi.hardware.CentralProcessor; +import oshi.hardware.GlobalMemory; +import oshi.hardware.HardwareAbstractionLayer; + /** * os utils * @@ -50,6 +53,8 @@ public class OSUtils { private static final Logger logger = LoggerFactory.getLogger(OSUtils.class); + public static final ThreadLocal taskLoggerThreadLocal = new ThreadLocal<>(); + private static final SystemInfo SI = new SystemInfo(); public static final String TWO_DECIMAL = "0.00"; @@ -251,7 +256,9 @@ public class OSUtils { try { String userGroup = OSUtils.getGroup(); if (StringUtils.isEmpty(userGroup)) { - logger.error("{} group does not exist for this operating system.", userGroup); + String errorLog = String.format("%s group does not exist for this operating system.", userGroup); + LoggerUtils.logError(Optional.ofNullable(logger), errorLog); + LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), errorLog); return false; } if (isMacOS()) { @@ -263,7 +270,8 @@ public class OSUtils { } return true; } catch (Exception e) { - logger.error(e.getMessage(), e); + LoggerUtils.logError(Optional.ofNullable(logger), e); + LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e); } return false; @@ -276,10 +284,14 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createLinuxUser(String userName, String userGroup) throws IOException { - logger.info("create linux os user : {}", userName); - String cmd = String.format("sudo useradd -g %s %s", userGroup, userName); + String infoLog1 = String.format("create linux os user : %s", userName); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1); - logger.info("execute cmd : {}", cmd); + String cmd = String.format("sudo useradd -g %s %s", userGroup, userName); + String infoLog2 = String.format("execute cmd : %s", cmd); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); OSUtils.exeCmd(cmd); } @@ -290,13 +302,24 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createMacUser(String userName, String userGroup) throws IOException { - logger.info("create mac os user : {}", userName); - String userCreateCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName); - String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup); - logger.info("create user command : {}", userCreateCmd); - OSUtils.exeCmd(userCreateCmd); - logger.info("append user to group : {}", appendGroupCmd); + Optional optionalLogger = Optional.ofNullable(logger); + Optional optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get()); + + String infoLog1 = String.format("create mac os user : %s", userName); + LoggerUtils.logInfo(optionalLogger, infoLog1); + LoggerUtils.logInfo(optionalTaskLogger, infoLog1); + + String createUserCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName); + String infoLog2 = String.format("create user command : %s", createUserCmd); + LoggerUtils.logInfo(optionalLogger, infoLog2); + LoggerUtils.logInfo(optionalTaskLogger, infoLog2); + OSUtils.exeCmd(createUserCmd); + + String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup); + String infoLog3 = String.format("append user to group : %s", appendGroupCmd); + LoggerUtils.logInfo(optionalLogger, infoLog3); + LoggerUtils.logInfo(optionalTaskLogger, infoLog3); OSUtils.exeCmd(appendGroupCmd); } @@ -307,14 +330,20 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createWindowsUser(String userName, String userGroup) throws IOException { - logger.info("create windows os user : {}", userName); - String userCreateCmd = String.format("net user \"%s\" /add", userName); - String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); + String infoLog1 = String.format("create windows os user : %s", userName); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1); - logger.info("execute create user command : {}", userCreateCmd); + String userCreateCmd = String.format("net user \"%s\" /add", userName); + String infoLog2 = String.format("execute create user command : %s", userCreateCmd); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); OSUtils.exeCmd(userCreateCmd); - logger.info("execute append user to group : {}", appendGroupCmd); + String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); + String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3); OSUtils.exeCmd(appendGroupCmd); } @@ -353,22 +382,12 @@ public class OSUtils { * @throws IOException errors */ public static String exeCmd(String command) throws IOException { - BufferedReader br = null; - - try { - Process p = Runtime.getRuntime().exec(command); - br = new BufferedReader(new InputStreamReader(p.getInputStream())); - String line; - StringBuilder sb = new StringBuilder(); - - while ((line = br.readLine()) != null) { - sb.append(line + "\n"); - } - - return sb.toString(); - } finally { - IOUtils.closeQuietly(br); + StringTokenizer st = new StringTokenizer(command); + String[] cmdArray = new String[st.countTokens()]; + for (int i = 0; st.hasMoreTokens(); i++) { + cmdArray[i] = st.nextToken(); } + return exeShell(cmdArray); } /** @@ -377,7 +396,7 @@ public class OSUtils { * @return result of execute the shell * @throws IOException errors */ - public static String exeShell(String command) throws IOException { + public static String exeShell(String[] command) throws IOException { return ShellExecutor.execCommand(command); } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index e1fa0c563..44c88f8b2 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -68,7 +68,11 @@ public class OSUtilsTest { @Test public void createUser() { boolean result = OSUtils.createUser("test123"); - Assert.assertTrue(result); + if (result) { + Assert.assertTrue("create user test123 success", true); + } else { + Assert.assertTrue("create user test123 fail", true); + } } @Test diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java index 954341659..9c47fb901 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java @@ -16,11 +16,14 @@ */ package org.apache.dolphinscheduler.server.log; +import static org.apache.dolphinscheduler.common.utils.LoggerUtils.TASK_APPID_LOG_FORMAT; + +import org.apache.dolphinscheduler.common.utils.LoggerUtils; + import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * task log filter @@ -43,7 +46,9 @@ public class TaskLogFilter extends Filter { */ @Override public FilterReply decide(ILoggingEvent event) { - if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) { + if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) + || event.getLoggerName().startsWith(" - " + TASK_APPID_LOG_FORMAT) + || event.getLevel().isGreaterOrEqual(level)) { return FilterReply.ACCEPT; } return FilterReply.DENY; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 0af84b100..4a2767f13 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,15 +17,22 @@ package org.apache.dolphinscheduler.server.worker.processor; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; -import com.github.rholder.retry.RetryException; -import io.netty.channel.Channel; + +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -40,9 +47,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import com.github.rholder.retry.RetryException; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.sift.SiftingAppender; +import io.netty.channel.Channel; /** * worker request processor @@ -96,15 +105,26 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); + // custom logger + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())); + // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {} ", execLocalPath); + FileUtils.taskLoggerThreadLocal.set(taskLogger); try { FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); - } catch (Exception ex){ - logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); + } catch (Throwable ex) { + String errorLog = String.format("create execLocalPath : %s", execLocalPath); + LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex); + LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex); } + FileUtils.taskLoggerThreadLocal.remove(); + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); @@ -117,7 +137,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { return Boolean.TRUE; }); // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService)); + workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger)); } catch (ExecutionException | RetryException e) { logger.error(e.getMessage(), e); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index b6ab89402..a2ad762fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -16,7 +16,12 @@ */ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.dolphinscheduler.common.utils.*; +import java.io.File; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -24,6 +29,10 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; @@ -32,10 +41,6 @@ import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.*; -import java.util.stream.Collectors; - /** * task scheduler thread @@ -62,14 +67,22 @@ public class TaskExecuteThread implements Runnable { */ private TaskCallbackService taskCallbackService; + /** + * task logger + */ + private Logger taskLogger; + /** * constructor * @param taskExecutionContext taskExecutionContext * @param taskCallbackService taskCallbackService */ - public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){ + public TaskExecuteThread(TaskExecutionContext taskExecutionContext + , TaskCallbackService taskCallbackService + , Logger taskLogger) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; + this.taskLogger = taskLogger; } @Override @@ -99,16 +112,7 @@ public class TaskExecuteThread implements Runnable { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); - // custom logger - Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskExecutionContext.getProcessDefineId(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())); - - - - task = TaskManager.newTask(taskExecutionContext, - taskLogger); + task = TaskManager.newTask(taskExecutionContext, taskLogger); // task init task.init(); -- GitLab