未验证 提交 38e48537 编写于 作者: Y Yichao Yang 提交者: GitHub

[Feature-2925][server] Init TaskLogger in TaskExecuteProcessor (#2925) (#2965)

* [Feature-2925][common] Add exitVal judge in OSUtils.exeCmd (#2925)

* optimize the logger utils
上级 9bf67d80
......@@ -16,9 +16,6 @@
*/
package org.apache.dolphinscheduler.common.shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
......@@ -30,6 +27,9 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A base class for running a Unix command.
......@@ -128,7 +128,7 @@ public abstract class AbstractShell {
/**
* Run a command actual work
*/
private void runCommand() throws IOException {
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask = null;
......@@ -153,11 +153,11 @@ public abstract class AbstractShell {
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process
.getErrorStream()));
BufferedReader inReader =
new BufferedReader(new InputStreamReader(process
.getInputStream()));
new BufferedReader(
new InputStreamReader(process.getErrorStream()));
BufferedReader inReader =
new BufferedReader(
new InputStreamReader(process.getInputStream()));
final StringBuilder errMsg = new StringBuilder();
// read error and input streams as this would free up the buffers
......@@ -177,23 +177,35 @@ public abstract class AbstractShell {
}
}
};
Thread inThread = new Thread() {
@Override
public void run() {
try {
parseExecResult(inReader);
} catch (IOException ioe) {
logger.warn("Error reading the in stream", ioe);
}
super.run();
}
};
try {
errThread.start();
inThread.start();
} catch (IllegalStateException ise) { }
try {
// parse the output
parseExecResult(inReader);
exitCode = process.waitFor();
exitCode = process.waitFor();
try {
// make sure that the error thread exits
// make sure that the error and in thread exits
errThread.join();
inThread.join();
} catch (InterruptedException ie) {
logger.warn("Interrupted while reading the error stream", ie);
logger.warn("Interrupted while reading the error and in stream", ie);
}
completed.set(true);
//the timeout thread handling
//taken care in finally block
if (exitCode != 0) {
if (exitCode != 0 || errMsg.length() > 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
......
......@@ -16,18 +16,32 @@
*/
package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Optional;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* file utils
*/
......@@ -36,6 +50,8 @@ public class FileUtils {
public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler");
public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();
/**
* get file suffix
*
......@@ -118,7 +134,7 @@ public class FileUtils {
String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId));
File file = new File(fileName);
if (!file.getParentFile().exists()){
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
......@@ -138,24 +154,40 @@ public class FileUtils {
* @param userName user name
* @throws IOException errors
*/
public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException{
public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException {
//if work dir exists, first delete
File execLocalPathFile = new File(execLocalPath);
if (execLocalPathFile.exists()){
if (execLocalPathFile.exists()) {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
}
//create work dir
org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile);
logger.info("create dir success {}" , execLocalPath);
String mkdirLog = "create dir success " + execLocalPath;
LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);
//if not exists this user,then create
if (!OSUtils.getUserList().contains(userName)){
OSUtils.createUser(userName);
OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
try {
if (!OSUtils.getUserList().contains(userName)) {
boolean isSuccessCreateUser = OSUtils.createUser(userName);
String infoLog;
if (isSuccessCreateUser) {
infoLog = String.format("create user name success %s", userName);
} else {
infoLog = String.format("create user name fail %s", userName);
}
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
}
} catch (Throwable e) {
LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
}
logger.info("create user name success {}", userName);
OSUtils.taskLoggerThreadLocal.remove();
}
......
......@@ -16,14 +16,15 @@
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
/**
* logger utils
*/
......@@ -93,4 +94,24 @@ public class LoggerUtils {
}
return appIds;
}
public static void logError(Optional<Logger> optionalLogger
, String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error));
}
public static void logError(Optional<Logger> optionalLogger
, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e));
}
public static void logError(Optional<Logger> optionalLogger
, String error, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error, e));
}
public static void logInfo(Optional<Logger> optionalLogger
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
}
\ No newline at end of file
......@@ -16,16 +16,6 @@
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
......@@ -40,8 +30,21 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
import org.apache.commons.configuration.Configuration;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;
/**
* os utils
*
......@@ -50,6 +53,8 @@ public class OSUtils {
private static final Logger logger = LoggerFactory.getLogger(OSUtils.class);
public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();
private static final SystemInfo SI = new SystemInfo();
public static final String TWO_DECIMAL = "0.00";
......@@ -251,7 +256,9 @@ public class OSUtils {
try {
String userGroup = OSUtils.getGroup();
if (StringUtils.isEmpty(userGroup)) {
logger.error("{} group does not exist for this operating system.", userGroup);
String errorLog = String.format("%s group does not exist for this operating system.", userGroup);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), errorLog);
return false;
}
if (isMacOS()) {
......@@ -263,7 +270,8 @@ public class OSUtils {
}
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
}
return false;
......@@ -276,10 +284,14 @@ public class OSUtils {
* @throws IOException in case of an I/O error
*/
private static void createLinuxUser(String userName, String userGroup) throws IOException {
logger.info("create linux os user : {}", userName);
String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
String infoLog1 = String.format("create linux os user : %s", userName);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);
logger.info("execute cmd : {}", cmd);
String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
String infoLog2 = String.format("execute cmd : %s", cmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(cmd);
}
......@@ -290,13 +302,24 @@ public class OSUtils {
* @throws IOException in case of an I/O error
*/
private static void createMacUser(String userName, String userGroup) throws IOException {
logger.info("create mac os user : {}", userName);
String userCreateCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
logger.info("create user command : {}", userCreateCmd);
OSUtils.exeCmd(userCreateCmd);
logger.info("append user to group : {}", appendGroupCmd);
Optional<Logger> optionalLogger = Optional.ofNullable(logger);
Optional<Logger> optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get());
String infoLog1 = String.format("create mac os user : %s", userName);
LoggerUtils.logInfo(optionalLogger, infoLog1);
LoggerUtils.logInfo(optionalTaskLogger, infoLog1);
String createUserCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
String infoLog2 = String.format("create user command : %s", createUserCmd);
LoggerUtils.logInfo(optionalLogger, infoLog2);
LoggerUtils.logInfo(optionalTaskLogger, infoLog2);
OSUtils.exeCmd(createUserCmd);
String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
String infoLog3 = String.format("append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(optionalLogger, infoLog3);
LoggerUtils.logInfo(optionalTaskLogger, infoLog3);
OSUtils.exeCmd(appendGroupCmd);
}
......@@ -307,14 +330,20 @@ public class OSUtils {
* @throws IOException in case of an I/O error
*/
private static void createWindowsUser(String userName, String userGroup) throws IOException {
logger.info("create windows os user : {}", userName);
String userCreateCmd = String.format("net user \"%s\" /add", userName);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
String infoLog1 = String.format("create windows os user : %s", userName);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);
logger.info("execute create user command : {}", userCreateCmd);
String userCreateCmd = String.format("net user \"%s\" /add", userName);
String infoLog2 = String.format("execute create user command : %s", userCreateCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(userCreateCmd);
logger.info("execute append user to group : {}", appendGroupCmd);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3);
OSUtils.exeCmd(appendGroupCmd);
}
......@@ -353,22 +382,12 @@ public class OSUtils {
* @throws IOException errors
*/
public static String exeCmd(String command) throws IOException {
BufferedReader br = null;
try {
Process p = Runtime.getRuntime().exec(command);
br = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
StringBuilder sb = new StringBuilder();
while ((line = br.readLine()) != null) {
sb.append(line + "\n");
}
return sb.toString();
} finally {
IOUtils.closeQuietly(br);
StringTokenizer st = new StringTokenizer(command);
String[] cmdArray = new String[st.countTokens()];
for (int i = 0; st.hasMoreTokens(); i++) {
cmdArray[i] = st.nextToken();
}
return exeShell(cmdArray);
}
/**
......@@ -377,7 +396,7 @@ public class OSUtils {
* @return result of execute the shell
* @throws IOException errors
*/
public static String exeShell(String command) throws IOException {
public static String exeShell(String[] command) throws IOException {
return ShellExecutor.execCommand(command);
}
......
......@@ -68,7 +68,11 @@ public class OSUtilsTest {
@Test
public void createUser() {
boolean result = OSUtils.createUser("test123");
Assert.assertTrue(result);
if (result) {
Assert.assertTrue("create user test123 success", true);
} else {
Assert.assertTrue("create user test123 fail", true);
}
}
@Test
......
......@@ -16,11 +16,14 @@
*/
package org.apache.dolphinscheduler.server.log;
import static org.apache.dolphinscheduler.common.utils.LoggerUtils.TASK_APPID_LOG_FORMAT;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
/**
* task log filter
......@@ -43,7 +46,9 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
*/
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) {
if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME)
|| event.getLoggerName().startsWith(" - " + TASK_APPID_LOG_FORMAT)
|| event.getLevel().isGreaterOrEqual(level)) {
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
......
......@@ -17,15 +17,22 @@
package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.github.rholder.retry.RetryException;
import io.netty.channel.Channel;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
......@@ -40,9 +47,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import com.github.rholder.retry.RetryException;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
/**
* worker request processor
......@@ -96,15 +105,26 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath);
FileUtils.taskLoggerThreadLocal.set(taskLogger);
try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
} catch (Throwable ex) {
String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
}
FileUtils.taskLoggerThreadLocal.remove();
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
......@@ -117,7 +137,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
return Boolean.TRUE;
});
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -16,7 +16,12 @@
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.utils.*;
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
......@@ -24,6 +29,10 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
......@@ -32,10 +41,6 @@ import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
/**
* task scheduler thread
......@@ -62,14 +67,22 @@ public class TaskExecuteThread implements Runnable {
*/
private TaskCallbackService taskCallbackService;
/**
* task logger
*/
private Logger taskLogger;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService
*/
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){
public TaskExecuteThread(TaskExecutionContext taskExecutionContext
, TaskCallbackService taskCallbackService
, Logger taskLogger) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.taskLogger = taskLogger;
}
@Override
......@@ -99,16 +112,7 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext,
taskLogger);
task = TaskManager.newTask(taskExecutionContext, taskLogger);
// task init
task.init();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册