未验证 提交 d6fc749c 编写于 作者: K Kirs 提交者: GitHub

[FIX-#6007]Wrong complement date (#6009)

上级 40db1877
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
......@@ -191,6 +192,19 @@ public class TaskExecutionContext implements Serializable{
*/
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
/**
* business param
*/
private Map<String, Property> paramsMap;
public Map<String, Property> getParamsMap() {
return paramsMap;
}
public void setParamsMap(Map<String, Property> paramsMap) {
this.paramsMap = paramsMap;
}
/**
* procedure TaskExecutionContext
*/
......
......@@ -17,14 +17,20 @@
package org.apache.dolphinscheduler.server.worker.runner;
import static java.util.Calendar.DAY_OF_MONTH;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
......@@ -128,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
// task init
task.init();
preBuildBusinessParams();
// task handle
task.handle();
......@@ -154,6 +160,24 @@ public class TaskExecuteThread implements Runnable {
}
}
private void preBuildBusinessParams(){
Map<String, Property> paramsMap = new HashMap<>();
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
Date date = taskExecutionContext.getScheduleTime();
if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
}
String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_DATETIME);
paramsMap.put(Constants.PARAMETER_DATETIME, p);
}
taskExecutionContext.setParamsMap(paramsMap);
}
/**
* when task finish, clear execute path.
*/
......
......@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
......@@ -57,6 +58,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -157,7 +159,12 @@ public class DataxTask extends AbstractTask {
dataXParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
// run datax process
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
......
......@@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -86,6 +89,12 @@ public class FlinkTask extends AbstractYarnTask {
flinkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
logger.info("param Map : {}", paramsMap);
if (paramsMap != null) {
......
......@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.task.http;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -51,6 +53,7 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -148,6 +151,12 @@ public class HttpTask extends AbstractTask {
httpParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
List<HttpProperty> httpPropertyList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(httpParameters.getHttpParams() )){
for (HttpProperty httpProperty: httpParameters.getHttpParams()) {
......
......@@ -32,7 +32,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -90,6 +93,12 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
......
......@@ -29,8 +29,12 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Map;
/**
......@@ -120,6 +124,12 @@ public class PythonTask extends AbstractTask {
pythonParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
if (paramsMap != null){
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
......
......@@ -32,6 +32,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import java.io.File;
......@@ -138,20 +141,11 @@ public class ShellTask extends AbstractTask {
shellParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
if (paramsMap == null) {
paramsMap = new HashMap<>();
}
Date date = taskExecutionContext.getScheduleTime();
if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
}
String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_DATETIME);
paramsMap.put(Constants.PARAMETER_DATETIME, p);
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
......
......@@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
......@@ -181,9 +182,14 @@ public class SqlTask extends AbstractTask {
sqlParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
// spell SQL according to the final user-defined variable
if(paramsMap == null){
if(paramsMap.isEmpty()){
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
......
......@@ -28,6 +28,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.apache.commons.collections.MapUtils;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
......@@ -78,7 +81,12 @@ public class SqoopTask extends AbstractYarnTask {
sqoopParameters.getLocalParametersMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime());
if(MapUtils.isEmpty(paramsMap)){
paramsMap=new HashMap<>();
}
if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())){
paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
}
if (paramsMap != null) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册