提交 223bdec8 编写于 作者: Q qiaozhanwei

refactor-worker merge to dev bug fix

上级 41d8abda
......@@ -739,6 +739,7 @@ public final class Constants {
public static final String ALIAS = "alias";
public static final String CONTENT = "content";
public static final String DEPENDENT_SPLIT = ":||";
public static final String DEPENDENT_ALL = "ALL";
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import java.io.Serializable;
/**
* master/worker task transport
*/
public class SqoopTaskExecutionContext implements Serializable{
/**
* dataSourceId
*/
private int dataSourceId;
/**
* sourcetype
*/
private int sourcetype;
/**
* sourceConnectionParams
*/
private String sourceConnectionParams;
/**
* dataTargetId
*/
private int dataTargetId;
/**
* targetType
*/
private int targetType;
/**
* targetConnectionParams
*/
private String targetConnectionParams;
public int getDataSourceId() {
return dataSourceId;
}
public void setDataSourceId(int dataSourceId) {
this.dataSourceId = dataSourceId;
}
public int getSourcetype() {
return sourcetype;
}
public void setSourcetype(int sourcetype) {
this.sourcetype = sourcetype;
}
public String getSourceConnectionParams() {
return sourceConnectionParams;
}
public void setSourceConnectionParams(String sourceConnectionParams) {
this.sourceConnectionParams = sourceConnectionParams;
}
public int getDataTargetId() {
return dataTargetId;
}
public void setDataTargetId(int dataTargetId) {
this.dataTargetId = dataTargetId;
}
public int getTargetType() {
return targetType;
}
public void setTargetType(int targetType) {
this.targetType = targetType;
}
public String getTargetConnectionParams() {
return targetConnectionParams;
}
public void setTargetConnectionParams(String targetConnectionParams) {
this.targetConnectionParams = targetConnectionParams;
}
@Override
public String toString() {
return "SqoopTaskExecutionContext{" +
"dataSourceId=" + dataSourceId +
", sourcetype=" + sourcetype +
", sourceConnectionParams='" + sourceConnectionParams + '\'' +
", dataTargetId=" + dataTargetId +
", targetType=" + targetType +
", targetConnectionParams='" + targetConnectionParams + '\'' +
'}';
}
}
......@@ -187,6 +187,11 @@ public class TaskExecutionContext implements Serializable{
*/
private DependenceTaskExecutionContext dependenceTaskExecutionContext;
/**
* sqoop TaskExecutionContext
*/
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
/**
* procedure TaskExecutionContext
*/
......@@ -446,6 +451,14 @@ public class TaskExecutionContext implements Serializable{
this.resources = resources;
}
public SqoopTaskExecutionContext getSqoopTaskExecutionContext() {
return sqoopTaskExecutionContext;
}
public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext sqoopTaskExecutionContext) {
this.sqoopTaskExecutionContext = sqoopTaskExecutionContext;
}
@Override
public String toString() {
return "TaskExecutionContext{" +
......@@ -479,6 +492,7 @@ public class TaskExecutionContext implements Serializable{
", sqlTaskExecutionContext=" + sqlTaskExecutionContext +
", dataxTaskExecutionContext=" + dataxTaskExecutionContext +
", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext +
", sqoopTaskExecutionContext=" + sqoopTaskExecutionContext +
", procedureTaskExecutionContext=" + procedureTaskExecutionContext +
'}';
}
......
......@@ -26,8 +26,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
......@@ -67,20 +67,26 @@ public class DependentTask extends AbstractTask {
*/
private ProcessService processService;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
* @param props props
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public DependentTask(TaskProps props, Logger logger) {
super(props, logger);
public DependentTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init(){
logger.info("dependent task initialize");
this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(),
this.dependentParameters = JSONUtils.parseObject(null,
DependentParameters.class);
if(dependentParameters != null){
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
......@@ -91,10 +97,10 @@ public class DependentTask extends AbstractTask {
this.processService = SpringApplicationContext.getBean(ProcessService.class);
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();
if(taskExecutionContext.getScheduleTime() != null){
this.dependentDate = taskExecutionContext.getScheduleTime();
}else{
this.dependentDate = taskProps.getTaskStartTime();
this.dependentDate = taskExecutionContext.getStartTime();
}
}
......@@ -102,13 +108,13 @@ public class DependentTask extends AbstractTask {
@Override
public void handle() throws Exception {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
try{
TaskInstance taskInstance = null;
while(Stopper.isRunning()){
taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId());
taskInstance = processService.findTaskInstanceById(this.taskExecutionContext.getTaskInstanceId());
if(taskInstance == null){
exitStatusCode = -1;
......
......@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.slf4j.Logger;
import java.util.Map;
......@@ -62,7 +61,7 @@ public class SqoopTask extends AbstractYarnTask {
protected String buildCommand() throws Exception {
//get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters);
String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext);
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* Source Generator Interface
......@@ -25,8 +26,9 @@ public interface ISourceGenerator {
/**
* generate the source script
* @param sqoopParameters sqoop params
* @return
* @param sqoopParameters sqoopParameters
* @param taskExecutionContext taskExecutionContext
* @return source script
*/
String generate(SqoopParameters sqoopParameters);
String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
}
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* Target Generator Interface
......@@ -24,9 +25,10 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
public interface ITargetGenerator {
/**
* generate the target script
* @param sqoopParameters sqoop params
* @return
* generate the target script
* @param sqoopParameters sqoopParameters
* @param taskExecutionContext taskExecutionContext
* @return target script
*/
String generate(SqoopParameters sqoopParameters);
String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
}
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HiveSourceGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.MysqlSourceGenerator;
......@@ -60,15 +61,15 @@ public class SqoopJobGenerator {
* @param sqoopParameters
* @return
*/
public String generateSqoopJob(SqoopParameters sqoopParameters){
public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){
createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType());
if(sourceGenerator == null || targetGenerator == null){
return null;
}
return commonGenerator.generate(sqoopParameters)
+ sourceGenerator.generate(sqoopParameters)
+ targetGenerator.generate(sqoopParameters);
+ sourceGenerator.generate(sqoopParameters,taskExecutionContext)
+ targetGenerator.generate(sqoopParameters,taskExecutionContext);
}
/**
......
......@@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,7 +33,7 @@ public class HdfsSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try{
SourceHdfsParameter sourceHdfsParameter
......
......@@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,7 +33,7 @@ public class HiveSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder sb = new StringBuilder();
try{
SourceHiveParameter sourceHiveParameter
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.QueryType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
......@@ -24,10 +25,9 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -41,17 +41,17 @@ public class MysqlSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try {
SourceMysqlParameter sourceMysqlParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class);
SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
if(sourceMysqlParameter != null){
ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
DataSource dataSource= processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()),
sqoopTaskExecutionContext.getSourceConnectionParams());
if(baseDataSource != null){
result.append(" --connect ")
.append(baseDataSource.getJdbcUrl())
......
......@@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,7 +33,7 @@ public class HdfsTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try{
TargetHdfsParameter targetHdfsParameter =
......
......@@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,7 +33,7 @@ public class HiveTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
......
......@@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -37,7 +40,7 @@ public class MysqlTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try{
......@@ -45,13 +48,13 @@ public class MysqlTargetGenerator implements ITargetGenerator {
TargetMysqlParameter targetMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class);
SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){
ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
DataSource dataSource= processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
// get datasource
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()),
sqoopTaskExecutionContext.getTargetConnectionParams());
if(baseDataSource != null){
result.append(" --connect ")
......
......@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -76,10 +77,9 @@ public class DependentTaskTest {
TaskProps taskProps = new TaskProps();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskProps.setTaskInstId(252612);
taskProps.setDependence(dependString);
taskProps.setTaskStartTime(new Date());
DependentTask dependentTask = new DependentTask(taskProps, logger);
DependentTask dependentTask = new DependentTask(new TaskExecutionContext(), logger);
dependentTask.init();
dependentTask.handle();
Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS );
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册