diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 1865d3ec54825ca42ab7a7da9a5be34ba0d6d238..667bdefb196d9791ece31b3b6de9374c3b28c2cf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 52353bebd5caf25963262d6fc6aa82f23735615a..a3f5fe505b82aa72f1741db9156ab391b6389fb0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.remote.command.Command; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index fc10a2943fdf5a08226c3a91944ab0486eebbfda..b06ad37e2fa30b92fa3e18bb7640119c051e1200 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -42,7 +42,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskRequest.java deleted file mode 100644 index 8afc3973983f7d524f61621a36e49f4fbd14bdd3..0000000000000000000000000000000000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskRequest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.task.request; - -/** - * DataX Task ExecutionContext - * to master/worker task transport - */ -public class DataxTaskRequest extends TaskRequest { - - /** - * dataSourceId - */ - private int dataSourceId; - - /** - * sourcetype - */ - private int sourcetype; - - /** - * sourceConnectionParams - */ - private String sourceConnectionParams; - - /** - * dataTargetId - */ - private int dataTargetId; - - /** - * targetType - */ - private int targetType; - - /** - * targetConnectionParams - */ - private String targetConnectionParams; - - public int getDataSourceId() { - return dataSourceId; - } - - public void setDataSourceId(int dataSourceId) { - this.dataSourceId = dataSourceId; - } - - public int getSourcetype() { - return sourcetype; - } - - public void setSourcetype(int sourcetype) { - this.sourcetype = sourcetype; - } - - public String getSourceConnectionParams() { - return sourceConnectionParams; - } - - public void setSourceConnectionParams(String sourceConnectionParams) { - this.sourceConnectionParams = sourceConnectionParams; - } - - public int getDataTargetId() { - return dataTargetId; - } - - public void setDataTargetId(int dataTargetId) { - this.dataTargetId = dataTargetId; - } - - public int getTargetType() { - return targetType; - } - - public void setTargetType(int targetType) { - this.targetType = targetType; - } - - public String getTargetConnectionParams() { - return targetConnectionParams; - } - - public void setTargetConnectionParams(String targetConnectionParams) { - this.targetConnectionParams = targetConnectionParams; - } - - @Override - public String toString() { - return "DataxTaskExecutionContext{" - + "dataSourceId=" + dataSourceId - + ", sourcetype=" + sourcetype - + ", sourceConnectionParams='" + sourceConnectionParams + '\'' - + ", dataTargetId=" + dataTargetId - + ", targetType=" + targetType - + ", targetConnectionParams='" + targetConnectionParams + '\'' - + '}'; - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskExecutionContext.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java rename to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskExecutionContext.java index d5fc97c8de8887886f93f4c5e0cd9521748a8b78..07d94b145b7f62d8f23c67e14c77eec753ff5cdd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskExecutionContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.entity; +package org.apache.dolphinscheduler.spi.task.request; import java.io.Serializable; diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskRequest.java deleted file mode 100644 index b043d7f1d4b1ed48d470cf9845803dad764876c2..0000000000000000000000000000000000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskRequest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.task.request; - -/** - * Procedure Task ExecutionContext - * to master/worker task transport - */ -public class ProcedureTaskRequest extends TaskRequest { - - /** - * connectionParams - */ - private String connectionParams; - - public String getConnectionParams() { - return connectionParams; - } - - public void setConnectionParams(String connectionParams) { - this.connectionParams = connectionParams; - } - - @Override - public String toString() { - return "ProcedureTaskExecutionContext{" - + "connectionParams='" + connectionParams + '\'' - + '}'; - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskRequest.java deleted file mode 100644 index 0608af2fee59b8172fd0792b81236e4508412463..0000000000000000000000000000000000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskRequest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.task.request; - -import org.apache.dolphinscheduler.spi.task.UdfFuncBean; -import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer; - -import java.util.Map; - -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; - -/** - * SQL Task ExecutionContext - * to master/worker task transport - */ -public class SQLTaskRequest extends TaskRequest { - - /** - * warningGroupId - */ - private int warningGroupId; - - /** - * connectionParams - */ - private String connectionParams; - /** - * udf function tenant code map - */ - @JsonDeserialize(keyUsing = UdfFuncDeserializer.class) - private Map udfFuncTenantCodeMap; - - public int getWarningGroupId() { - return warningGroupId; - } - - public void setWarningGroupId(int warningGroupId) { - this.warningGroupId = warningGroupId; - } - - public Map getUdfFuncTenantCodeMap() { - return udfFuncTenantCodeMap; - } - - public void setUdfFuncTenantCodeMap(Map udfFuncTenantCodeMap) { - this.udfFuncTenantCodeMap = udfFuncTenantCodeMap; - } - - public String getConnectionParams() { - return connectionParams; - } - - public void setConnectionParams(String connectionParams) { - this.connectionParams = connectionParams; - } - - @Override - public String toString() { - return "SQLTaskExecutionContext{" - + "warningGroupId=" + warningGroupId - + ", connectionParams='" + connectionParams + '\'' - + ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap - + '}'; - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskRequest.java deleted file mode 100644 index 69f8cf8d66e94dacd34620c33bcadda8a8d8cd85..0000000000000000000000000000000000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskRequest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.task.request; - -/** - * Sqoop Task ExecutionContext - * to master/worker task transport - */ -public class SqoopTaskRequest extends TaskRequest { - - /** - * dataSourceId - */ - private int dataSourceId; - - /** - * sourcetype - */ - private int sourcetype; - - /** - * sourceConnectionParams - */ - private String sourceConnectionParams; - - /** - * dataTargetId - */ - private int dataTargetId; - - /** - * targetType - */ - private int targetType; - - /** - * targetConnectionParams - */ - private String targetConnectionParams; - - public int getDataSourceId() { - return dataSourceId; - } - - public void setDataSourceId(int dataSourceId) { - this.dataSourceId = dataSourceId; - } - - public int getSourcetype() { - return sourcetype; - } - - public void setSourcetype(int sourcetype) { - this.sourcetype = sourcetype; - } - - public String getSourceConnectionParams() { - return sourceConnectionParams; - } - - public void setSourceConnectionParams(String sourceConnectionParams) { - this.sourceConnectionParams = sourceConnectionParams; - } - - public int getDataTargetId() { - return dataTargetId; - } - - public void setDataTargetId(int dataTargetId) { - this.dataTargetId = dataTargetId; - } - - public int getTargetType() { - return targetType; - } - - public void setTargetType(int targetType) { - this.targetType = targetType; - } - - public String getTargetConnectionParams() { - return targetConnectionParams; - } - - public void setTargetConnectionParams(String targetConnectionParams) { - this.targetConnectionParams = targetConnectionParams; - } - - @Override - public String toString() { - return "SqoopTaskExecutionContext{" - + "dataSourceId=" + dataSourceId - + ", sourcetype=" + sourcetype - + ", sourceConnectionParams='" + sourceConnectionParams + '\'' - + ", dataTargetId=" + dataTargetId - + ", targetType=" + targetType - + ", targetConnectionParams='" + targetConnectionParams + '\'' - + '}'; - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java index 3e131f25f5e4402426822e6bdc6e1e368eea11bc..6a4aac6cf1a561212826dc9ebadb8b20a3a2a249 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java @@ -199,6 +199,11 @@ public class TaskRequest { */ private DataxTaskExecutionContext dataxTaskExecutionContext; + /** + * procedure TaskExecutionContext + */ + private ProcedureTaskExecutionContext procedureTaskExecutionContext; + /** * sqoop TaskExecutionContext */ @@ -467,4 +472,12 @@ public class TaskRequest { public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext sqoopTaskExecutionContext) { this.sqoopTaskExecutionContext = sqoopTaskExecutionContext; } + + public ProcedureTaskExecutionContext getProcedureTaskExecutionContext() { + return procedureTaskExecutionContext; + } + + public void setProcedureTaskExecutionContext(ProcedureTaskExecutionContext procedureTaskExecutionContext) { + this.procedureTaskExecutionContext = procedureTaskExecutionContext; + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 564ed1ad58ce9624e0e193446019ac606cc56e60..9087070df30ba8739ef9982c71f807c5e98247f2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -34,7 +34,8 @@ import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils; import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; -import org.apache.dolphinscheduler.spi.task.request.DataxTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.CollectionUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -107,14 +108,14 @@ public class DataxTask extends AbstractTaskExecutor { /** * taskExecutionContext */ - private DataxTaskRequest taskExecutionContext; + private TaskRequest taskExecutionContext; /** * constructor * * @param taskExecutionContext taskExecutionContext */ - public DataxTask(DataxTaskRequest taskExecutionContext) { + public DataxTask(TaskRequest taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; @@ -232,14 +233,14 @@ public class DataxTask extends AbstractTaskExecutor { * @throws SQLException if error throws SQLException */ private List buildDataxJobContentJson() { - + DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext(); BaseConnectionParam dataSourceCfg = (BaseConnectionParam) DatasourceUtil.buildConnectionParams( - DbType.of(taskExecutionContext.getSourcetype()), - taskExecutionContext.getSourceConnectionParams()); + DbType.of(dataxTaskExecutionContext.getSourcetype()), + dataxTaskExecutionContext.getSourceConnectionParams()); BaseConnectionParam dataTargetCfg = (BaseConnectionParam) DatasourceUtil.buildConnectionParams( - DbType.of(taskExecutionContext.getTargetType()), - taskExecutionContext.getTargetConnectionParams()); + DbType.of(dataxTaskExecutionContext.getTargetType()), + dataxTaskExecutionContext.getTargetConnectionParams()); List readerConnArr = new ArrayList<>(); ObjectNode readerConn = JSONUtils.createObjectNode(); @@ -260,7 +261,7 @@ public class DataxTask extends AbstractTaskExecutor { readerParam.putArray("connection").addAll(readerConnArr); ObjectNode reader = JSONUtils.createObjectNode(); - reader.put("name", DataxUtils.getReaderPluginName(DbType.of(taskExecutionContext.getSourcetype()))); + reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype()))); reader.set("parameter", readerParam); List writerConnArr = new ArrayList<>(); @@ -275,8 +276,8 @@ public class DataxTask extends AbstractTaskExecutor { writerParam.put("username", dataTargetCfg.getUser()); writerParam.put("password", decodePassword(dataTargetCfg.getPassword())); - String[] columns = parsingSqlColumnNames(DbType.of(taskExecutionContext.getSourcetype()), - DbType.of(taskExecutionContext.getTargetType()), + String[] columns = parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()), + DbType.of(dataxTaskExecutionContext.getTargetType()), dataSourceCfg, dataXParameters.getSql()); ArrayNode columnArr = writerParam.putArray("column"); @@ -301,7 +302,7 @@ public class DataxTask extends AbstractTaskExecutor { } ObjectNode writer = JSONUtils.createObjectNode(); - writer.put("name", DataxUtils.getWriterPluginName(DbType.of(taskExecutionContext.getTargetType()))); + writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType()))); writer.set("parameter", writerParam); List contentList = new ArrayList<>(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskChannel.java index fd10af28dd21d959ce8a4f8213767f0d8ff2e01e..a45ed4c5f3066d5fc0d16f1677be1cbf6a911441 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskChannel.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.datax; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskChannel; -import org.apache.dolphinscheduler.spi.task.request.DataxTaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; public class DataxTaskChannel implements TaskChannel { @@ -31,6 +30,6 @@ public class DataxTaskChannel implements TaskChannel { @Override public AbstractTask createTask(TaskRequest taskRequest) { - return new DataxTask((DataxTaskRequest) taskRequest); + return new DataxTask(taskRequest); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index cf9285ec2452b65c2583e93d6e17da0867585946..10882e04db492602c255a1a84910ba688d18a0e1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -122,7 +122,6 @@ public class FlinkTask extends AbstractYarnTask { if (resourceId == 0) { resourceName = mainJar.getRes(); } else { - //when update resource maybe has error ,也许也可以交给上层去做控制 需要看资源是否可以抽象为共性 目前来讲我认为是可以的 resourceName = mainJar.getResourceName().replaceFirst("/", ""); } mainJar.setRes(resourceName); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index 6e4a251e4d924c43213943911d74741deae4df28..615617dd6a227fa569bbf8b24a7fc9ec28e3e500 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -125,7 +125,6 @@ public class MapReduceTask extends AbstractYarnTask { if (resourceId == 0) { resourceName = mainJar.getRes(); } else { - //when update resource maybe has error ,也许也可以交给上层去做控制 需要看资源是否可以抽象为共性 目前来讲我认为是可以的 resourceName = mainJar.getResourceName().replaceFirst("/", ""); } mainJar.setRes(resourceName); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 4c830d499ba44a90140783c659c31ee53f311854..397a1500a3ed04c12bbb401d6bedac6aee53c0df 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.spi.task.Direct; import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils; import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; -import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.CollectionUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -44,7 +44,6 @@ import java.sql.SQLException; import java.sql.Types; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -60,14 +59,14 @@ public class ProcedureTask extends AbstractTaskExecutor { /** * taskExecutionContext */ - private ProcedureTaskRequest taskExecutionContext; + private TaskRequest taskExecutionContext; /** * constructor * * @param taskExecutionContext taskExecutionContext */ - public ProcedureTask(ProcedureTaskRequest taskExecutionContext) { + public ProcedureTask(TaskRequest taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; @@ -101,7 +100,7 @@ public class ProcedureTask extends AbstractTaskExecutor { DbType dbType = DbType.valueOf(procedureParameters.getType()); // get datasource ConnectionParam connectionParam = DatasourceUtil.buildConnectionParams(DbType.valueOf(procedureParameters.getType()), - taskExecutionContext.getConnectionParams()); + taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams()); // get jdbc connection connection = DatasourceUtil.getConnection(dbType, connectionParam); @@ -142,10 +141,7 @@ public class ProcedureTask extends AbstractTaskExecutor { */ private void printOutParameter(CallableStatement stmt, Map outParameterMap) throws SQLException { - Iterator> iter = outParameterMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - + for (Map.Entry en : outParameterMap.entrySet()) { int index = en.getKey(); Property property = en.getValue(); String prop = property.getProp(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannel.java index d0de0eb574c1c5b9a3a7eff304670a561baed4da..5181dc1254a65f52ae62edfcddcf33a566ca3e91 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannel.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.procedure; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskChannel; -import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; public class ProcedureTaskChannel implements TaskChannel { @@ -31,6 +30,6 @@ public class ProcedureTaskChannel implements TaskChannel { @Override public AbstractTask createTask(TaskRequest taskRequest) { - return new ProcedureTask((ProcedureTaskRequest) taskRequest); + return new ProcedureTask(taskRequest); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index a5f43762fc52924a49d8c7272dddd188de90dee0..a958c6e501a39d560e3d51a86b581de8e1af153e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils; import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import java.util.HashMap; @@ -44,18 +44,18 @@ public class SqoopTask extends AbstractYarnTask { /** * taskExecutionContext */ - private final SqoopTaskRequest sqoopTaskExecutionContext; + private final TaskRequest taskExecutionContext; - public SqoopTask(SqoopTaskRequest taskExecutionContext) { + public SqoopTask(TaskRequest taskExecutionContext) { super(taskExecutionContext); - this.sqoopTaskExecutionContext = taskExecutionContext; + this.taskExecutionContext = taskExecutionContext; } @Override public void init() { - logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams()); + logger.info("sqoop task params {}", taskExecutionContext.getTaskParams()); sqoopParameters = - JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class); + JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqoopParameters.class); //check sqoop task params if (null == sqoopParameters) { throw new IllegalArgumentException("Sqoop Task params is null"); @@ -70,16 +70,16 @@ public class SqoopTask extends AbstractYarnTask { protected String buildCommand() { //get sqoop scripts SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); + String script = generator.generateSqoopJob(sqoopParameters, taskExecutionContext); // combining local and global parameters - Map paramsMap = ParamUtils.convert(sqoopTaskExecutionContext, getParameters()); + Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); if (MapUtils.isEmpty(paramsMap)) { paramsMap = new HashMap<>(); } - if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) { - paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap()); + if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { + paramsMap.putAll(taskExecutionContext.getParamsMap()); } String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTaskChannel.java index f97b2218510d6ea05ee3c39307f9d0cd05d4579c..aa436696020a7b4b0a7f6f4364a3aa7de9cd9da1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTaskChannel.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.sqoop; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskChannel; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; public class SqoopTaskChannel implements TaskChannel { @@ -31,6 +30,6 @@ public class SqoopTaskChannel implements TaskChannel { @Override public AbstractTask createTask(TaskRequest taskRequest) { - return new SqoopTask((SqoopTaskRequest) taskRequest); + return new SqoopTask(taskRequest); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ISourceGenerator.java index fcb654293bf1ea5053de884caa70e5f43b0f7bd2..7e770a84e56e971fc7584f5f95e23488cbb0913a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ISourceGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ISourceGenerator.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop.generator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; /** * Source Generator Interface @@ -32,5 +32,5 @@ public interface ISourceGenerator { * @param taskExecutionContext taskExecutionContext * @return source script */ - String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext); + String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ITargetGenerator.java index 4c2fb4b6da83d6c7c06d6d3976b71e77e0cccebc..9b31273eda7519098495280ccbb11dbdde3ca069 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ITargetGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ITargetGenerator.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop.generator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; /** * Target Generator Interface @@ -32,5 +32,5 @@ public interface ITargetGenerator { * @param taskExecutionContext taskExecutionContext * @return target script */ - String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext); + String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/SqoopJobGenerator.java index 0c23a069188679166aaf1f291d44aa043f37373e..fba82df3d64bf11a97a854b57366d22fe7feb129 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/SqoopJobGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/SqoopJobGenerator.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.generator.targets.HdfsTarge import org.apache.dolphinscheduler.plugin.task.sqoop.generator.targets.HiveTargetGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.targets.MysqlTargetGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; /** * Sqoop Job Scripts Generator @@ -64,7 +64,7 @@ public class SqoopJobGenerator { * @param sqoopParameters sqoop params * @return sqoop scripts */ - public String generateSqoopJob(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generateSqoopJob(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { String sqoopScripts = ""; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HdfsSourceGenerator.java index 26ab521ca388a874e4b62e6bcb9fe4c6766a8a50..ec8f23e212906f6631970a536f2e5e48cd4d8e09 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HdfsSourceGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HdfsSourceGenerator.java @@ -23,7 +23,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ISourceGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.sources.SourceHdfsParameter; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -38,7 +38,7 @@ public class HdfsSourceGenerator implements ISourceGenerator { private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { StringBuilder hdfsSourceSb = new StringBuilder(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HiveSourceGenerator.java index 69fd835846ff48f702ed3708be1a7b0838260fd7..4d334f0d3d7bad997edb78a0b5da6fecf21932f2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HiveSourceGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HiveSourceGenerator.java @@ -26,7 +26,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ISourceGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.sources.SourceHiveParameter; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -41,7 +41,7 @@ public class HiveSourceGenerator implements ISourceGenerator { private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { StringBuilder hiveSourceSb = new StringBuilder(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/MysqlSourceGenerator.java index 5542572992a4c1a241b7da6604ebb9fe1e94d2c9..b1ee6a9c4c6bac17c6b53e9779eedb4b605cf921 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -42,7 +42,7 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.task.Property; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -59,7 +59,7 @@ public class MysqlSourceGenerator implements ISourceGenerator { private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { StringBuilder mysqlSourceSb = new StringBuilder(); @@ -68,8 +68,8 @@ public class MysqlSourceGenerator implements ISourceGenerator { if (null != sourceMysqlParameter) { BaseConnectionParam baseDataSource = (BaseConnectionParam) DatasourceUtil.buildConnectionParams( - DbType.of(taskExecutionContext.getSourcetype()), - taskExecutionContext.getSourceConnectionParams()); + DbType.of(taskExecutionContext.getSqoopTaskExecutionContext().getSourcetype()), + taskExecutionContext.getSqoopTaskExecutionContext().getSourceConnectionParams()); if (null != baseDataSource) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HdfsTargetGenerator.java index b36bdd1b1f22429c1d3d6a7b71a329131d05ee1a..84adf922334e80fd96766a5ed0840478f4f34b8d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HdfsTargetGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HdfsTargetGenerator.java @@ -29,7 +29,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ITargetGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.targets.TargetHdfsParameter; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -44,7 +44,7 @@ public class HdfsTargetGenerator implements ITargetGenerator { private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { StringBuilder hdfsTargetSb = new StringBuilder(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HiveTargetGenerator.java index acab488d1fc4aea6f7a5ed58be572b49c21f5615..05ba68fe9e076500ba9504fc230a10bcbd055990 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HiveTargetGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HiveTargetGenerator.java @@ -32,7 +32,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ITargetGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.targets.TargetHiveParameter; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -47,7 +47,7 @@ public class HiveTargetGenerator implements ITargetGenerator { private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { StringBuilder hiveTargetSb = new StringBuilder(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/MysqlTargetGenerator.java index 4cb4c6d4973d253b59166a49cd5042756b958ef7..8c3450262a73c563c980a650d15c541bba240c6e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/MysqlTargetGenerator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -37,7 +37,7 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ITargetGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.spi.enums.DbType; -import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -52,7 +52,7 @@ public class MysqlTargetGenerator implements ITargetGenerator { private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class); @Override - public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) { + public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) { StringBuilder mysqlTargetSb = new StringBuilder(); @@ -64,8 +64,8 @@ public class MysqlTargetGenerator implements ITargetGenerator { // get datasource BaseConnectionParam baseDataSource = (BaseConnectionParam) DatasourceUtil.buildConnectionParams( - DbType.of(taskExecutionContext.getTargetType()), - taskExecutionContext.getTargetConnectionParams()); + DbType.of(taskExecutionContext.getSqoopTaskExecutionContext().getTargetType()), + taskExecutionContext.getSqoopTaskExecutionContext().getTargetConnectionParams()); if (null != baseDataSource) {