diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 41331abfe0459f05e5ded593dfff6af8e12887c2..e6df0f6fd5502f2670a811fc62d58c11817dc7d1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.entity; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; @@ -191,6 +192,19 @@ public class TaskExecutionContext implements Serializable{ */ private SqoopTaskExecutionContext sqoopTaskExecutionContext; + /** + * business param + */ + private Map paramsMap; + + public Map getParamsMap() { + return paramsMap; + } + + public void setParamsMap(Map paramsMap) { + this.paramsMap = paramsMap; + } + /** * procedure TaskExecutionContext */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 81f14e7ea9767a3c0468315877930366b82f550c..77a775d5e29f9753d9723b24ff072b1759cf2af8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,14 +17,20 @@ package org.apache.dolphinscheduler.server.worker.runner; +import static java.util.Calendar.DAY_OF_MONTH; + import com.alibaba.fastjson.JSONObject; import org.apache.commons.collections.MapUtils; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -128,7 +134,7 @@ public class TaskExecuteThread implements Runnable { // task init task.init(); - + preBuildBusinessParams(); // task handle task.handle(); @@ -154,6 +160,24 @@ public class TaskExecuteThread implements Runnable { } } + + private void preBuildBusinessParams(){ + Map paramsMap = new HashMap<>(); + // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job + if (taskExecutionContext.getScheduleTime() != null) { + Date date = taskExecutionContext.getScheduleTime(); + if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) { + date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1); + } + String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_DATETIME); + paramsMap.put(Constants.PARAMETER_DATETIME, p); + } + taskExecutionContext.setParamsMap(paramsMap); + } + /** * when task finish, clear execute path. */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index afb9115b373b5c97811af1d639c6d62c023783d0..05221876e58b03c28879e8364425f0b54b25dc13 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; import java.io.File; @@ -57,6 +58,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -157,7 +159,12 @@ public class DataxTask extends AbstractTask { dataXParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); - + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } // run datax process String jsonFilePath = buildDataxJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 4d3419005249e07035da09db55a79bf6c1bf25eb..450964d81f73e93c910ed3a12c5f27c6acdb76ba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; +import org.apache.commons.collections.MapUtils; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,6 +89,12 @@ public class FlinkTask extends AbstractYarnTask { flinkParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } logger.info("param Map : {}", paramsMap); if (paramsMap != null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index 8dc56595c73e73621ddad9a7e722b0b47b02d83b..acc3eab0b803c666cd8202c1615c8350e62ad4d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.task.http; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; + +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.Charsets; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -51,6 +53,7 @@ import org.slf4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -148,6 +151,12 @@ public class HttpTask extends AbstractTask { httpParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } List httpPropertyList = new ArrayList<>(); if(CollectionUtils.isNotEmpty(httpParameters.getHttpParams() )){ for (HttpProperty httpProperty: httpParameters.getHttpParams()) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index f60b1cb4262ca8261094773290bebe2516e0bea2..4988794350a7837debc027f2409437ae6bdafa6d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -32,7 +32,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; +import org.apache.commons.collections.MapUtils; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -90,6 +93,12 @@ public class MapReduceTask extends AbstractYarnTask { mapreduceParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } if (paramsMap != null) { String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 7a66227b8d97636b0c238e65718b7029731c08fc..f8df005a683dcb87cf43c470157a9c54c1e69002 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -29,8 +29,12 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; + +import org.apache.commons.collections.MapUtils; + import org.slf4j.Logger; +import java.util.HashMap; import java.util.Map; /** @@ -120,6 +124,12 @@ public class PythonTask extends AbstractTask { pythonParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } if (paramsMap != null){ rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index a3317650a19a591db08bec8d796924dc3a7571d9..cca2a04140e53a3545ee0853ada2fc6e88581bfc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -32,6 +32,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; + +import org.apache.commons.collections.MapUtils; + import org.slf4j.Logger; import java.io.File; @@ -138,20 +141,11 @@ public class ShellTask extends AbstractTask { shellParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); - // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job - if (taskExecutionContext.getScheduleTime() != null) { - if (paramsMap == null) { - paramsMap = new HashMap<>(); - } - Date date = taskExecutionContext.getScheduleTime(); - if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) { - date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1); - } - String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME); - Property p = new Property(); - p.setValue(dateTime); - p.setProp(Constants.PARAMETER_DATETIME); - paramsMap.put(Constants.PARAMETER_DATETIME, p); + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); } script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index af1a5c529b3ac2f052f6c59028757150c25c6ca0..f021cb74c201163a805214336fb2f7e830d7110a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import java.sql.Connection; @@ -181,9 +182,14 @@ public class SqlTask extends AbstractTask { sqlParameters.getLocalParametersMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); - + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){ + paramsMap.putAll(taskExecutionContext.getParamsMap()); + } // spell SQL according to the final user-defined variable - if(paramsMap == null){ + if(paramsMap.isEmpty()){ sqlBuilder.append(sql); return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index 00d94f01bf7266bb7fa90898172a4d7a78f5dd14..da5898dcd876422c765f07c8e651673cc327c8cb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -28,6 +28,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; +import org.apache.commons.collections.MapUtils; + +import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; @@ -78,7 +81,12 @@ public class SqoopTask extends AbstractYarnTask { sqoopParameters.getLocalParametersMap(), CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()), sqoopTaskExecutionContext.getScheduleTime()); - + if(MapUtils.isEmpty(paramsMap)){ + paramsMap=new HashMap<>(); + } + if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())){ + paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap()); + } if (paramsMap != null) { String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); logger.info("sqoop script: {}", resultScripts);