未验证 提交 8e2c312d 编写于 作者: K Kirs 提交者: GitHub

[1.3.7-prepare#5256][Improvement][common]Task status error (#5860)

issue #5026
pr #5256
上级 76c4dd90
...@@ -103,6 +103,11 @@ public final class Constants { ...@@ -103,6 +103,11 @@ public final class Constants {
*/ */
public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address"; public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
/**
* yarn.job.history.status.address
*/
public static final String YARN_JOB_HISTORY_STATUS_ADDRESS = "yarn.job.history.status.address";
/** /**
* hdfs configuration * hdfs configuration
* hdfs.root.user * hdfs.root.user
...@@ -792,6 +797,10 @@ public final class Constants { ...@@ -792,6 +797,10 @@ public final class Constants {
*/ */
public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state"; public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state";
/**
* com.amazonaws.services.s3.enableV4
*/
public static final String AWS_S3_V4 = "com.amazonaws.services.s3.enableV4";
/** /**
* loginUserFromKeytab user * loginUserFromKeytab user
......
...@@ -70,6 +70,7 @@ public class HadoopUtils implements Closeable { ...@@ -70,6 +70,7 @@ public class HadoopUtils implements Closeable {
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY"; private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
...@@ -122,7 +123,7 @@ public class HadoopUtils implements Closeable { ...@@ -122,7 +123,7 @@ public class HadoopUtils implements Closeable {
try { try {
configuration = new HdfsConfiguration(); configuration = new HdfsConfiguration();
String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); String resourceStorageType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType); ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
if (resUploadType == ResUploadType.HDFS){ if (resUploadType == ResUploadType.HDFS){
...@@ -155,22 +156,18 @@ public class HadoopUtils implements Closeable { ...@@ -155,22 +156,18 @@ public class HadoopUtils implements Closeable {
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS); logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
} }
if (fs == null) {
if (StringUtils.isNotEmpty(hdfsUser)) { if (StringUtils.isNotEmpty(hdfsUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
ugi.doAs(new PrivilegedExceptionAction<Boolean>() { ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {
@Override
public Boolean run() throws Exception {
fs = FileSystem.get(configuration); fs = FileSystem.get(configuration);
return true; return true;
}
}); });
} else { } else {
logger.warn("hdfs.root.user is not set value!"); logger.warn("hdfs.root.user is not set value!");
fs = FileSystem.get(configuration); fs = FileSystem.get(configuration);
} }
}
} else if (resUploadType == ResUploadType.S3) { } else if (resUploadType == ResUploadType.S3) {
System.setProperty(Constants.AWS_S3_V4, Constants.STRING_TRUE);
configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS)); configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT)); configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY)); configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
...@@ -204,23 +201,23 @@ public class HadoopUtils implements Closeable { ...@@ -204,23 +201,23 @@ public class HadoopUtils implements Closeable {
* if rmHaIds is empty, single resourcemanager enabled * if rmHaIds is empty, single resourcemanager enabled
* if rmHaIds not empty: resourcemanager HA enabled * if rmHaIds not empty: resourcemanager HA enabled
*/ */
String appUrl = "";
if (StringUtils.isEmpty(rmHaIds)) {
//single resourcemanager enabled
appUrl = appAddress;
yarnEnabled = true;
} else {
//resourcemanager HA enabled
appUrl = getAppAddress(appAddress, rmHaIds);
yarnEnabled = true; yarnEnabled = true;
logger.info("application url : {}", appUrl); String appUrl = StringUtils.isEmpty(rmHaIds) ? appAddress : getAppAddress(appAddress, rmHaIds);
}
if (StringUtils.isBlank(appUrl)) { if (StringUtils.isBlank(appUrl)) {
throw new Exception("application url is blank"); throw new Exception("yarn application url generation failed");
}
if (logger.isDebugEnabled()) {
logger.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId);
} }
return String.format(appUrl, applicationId); String activeResourceManagerPort = String.valueOf(PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088));
return String.format(appUrl, activeResourceManagerPort, applicationId);
}
public String getJobHistoryUrl(String applicationId) {
//eg:application_1587475402360_712719 -> job_1587475402360_712719
String jobId = applicationId.replace("application", "job");
return String.format(jobHistoryAddress, jobId);
} }
/** /**
......
...@@ -61,7 +61,9 @@ fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK ...@@ -61,7 +61,9 @@ fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname # if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname
yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# if custom you resourcemanager port ,you need to replace 8088 else default value.
resource.manager.httpaddress.port=8088
# system env path # system env path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh #dolphinscheduler.env.path=env/dolphinscheduler_env.sh
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册