提交 e0f53cfe 编写于 作者: L lgcareer 提交者: bao liang

get processDao by DaoFactory (#994)

* rename from DatasourceUserMapper to DataSourceUserMapper

* add unit test in UserMapper and WorkerGroupMapper

* change cn.escheduler to org.apache.dolphinscheduler

* add unit test in UdfFuncMapperTest

* add unit test in UdfFuncMapperTest

* remove DatabaseConfiguration

* add ConnectionFactoryTest

* cal duration in processInstancesList

* change desc to description

* change table name in mysql ddl

* change table name in mysql ddl

* change escheduler to dolphinscheduler

* change escheduler to dolphinscheduler

* change escheduler to dolphinscheduler

* remove log4j-1.2-api and modify AlertMapperTest

* remove log4j-1.2-api

* Add alertDao to spring management

* Add alertDao to spring management

* get SqlSessionFactory from MybatisSqlSessionFactoryBean

* get processDao by DaoFactory

* read druid properties in ConneciontFactory

* read druid properties in ConneciontFactory
上级 38f38457
......@@ -53,7 +53,7 @@ public final class Constants {
/**
* dao properties path
*/
public static final String DAO_PROPERTIES_PATH = "/dao/data_source.properties";
public static final String DAO_PROPERTIES_PATH = "application.properties";
/**
* fs.defaultFS
......@@ -470,7 +470,7 @@ public final class Constants {
/**
* task record configuration path
*/
public static final String DATA_SOURCE_PROPERTIES = "dao/data_source.properties";
public static final String APPLICATION_PROPERTIES = "application.properties";
public static final String TASK_RECORD_URL = "task.record.datasource.url";
......
......@@ -45,7 +45,7 @@ public class MonitorDBDao {
static {
try {
conf = new PropertiesConfiguration(Constants.DATA_SOURCE_PROPERTIES);
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
}catch (ConfigurationException e){
logger.error("load configuration excetpion",e);
System.exit(1);
......
......@@ -46,6 +46,7 @@ import java.util.*;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.apache.dolphinscheduler.dao.datasource.ConnectionFactory.getMapper;
/**
* process relative dao that some mappers in this.
......@@ -118,6 +119,21 @@ public class ProcessDao extends AbstractBaseDao {
@Override
protected void init() {
taskQueue = TaskQueueFactory.getTaskQueueInstance();
userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class);
processInstanceMapMapper = getMapper(ProcessInstanceMapMapper.class);
taskInstanceMapper = getMapper(TaskInstanceMapper.class);
commandMapper = getMapper(CommandMapper.class);
scheduleMapper = getMapper(ScheduleMapper.class);
udfFuncMapper = getMapper(UdfFuncMapper.class);
resourceMapper = getMapper(ResourceMapper.class);
workerGroupMapper = getMapper(WorkerGroupMapper.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance();
tenantMapper = getMapper(TenantMapper.class);
}
......
......@@ -49,7 +49,7 @@ public class TaskRecordDao {
static {
try {
conf = new PropertiesConfiguration(Constants.DATA_SOURCE_PROPERTIES);
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
}catch (ConfigurationException e){
logger.error("load configuration excetpion",e);
System.exit(1);
......
......@@ -30,7 +30,6 @@ import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
......@@ -50,7 +49,7 @@ public class ConnectionFactory {
static {
try {
conf = new PropertiesConfiguration(Constants.DATA_SOURCE_PROPERTIES);
conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
}catch (ConfigurationException e){
logger.error("load configuration excetpion",e);
System.exit(1);
......@@ -61,19 +60,33 @@ public class ConnectionFactory {
* get the data source
*/
public static DruidDataSource getDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
// Map<String, String> allMap = YmlConfig.allMap;
druidDataSource.setDriverClassName(conf.getString("spring.datasource.driver-class-name"));
druidDataSource.setUrl(conf.getString("spring.datasource.url"));
druidDataSource.setUsername(conf.getString("spring.datasource.username"));
druidDataSource.setPassword(conf.getString("spring.datasource.password"));
druidDataSource.setInitialSize(5);
druidDataSource.setMinIdle(5);
druidDataSource.setMaxActive(20);
druidDataSource.setMaxWait(60000);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(300000);
druidDataSource.setValidationQuery("SELECT 1");
druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL));
druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME));
druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD));
druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY));
druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS));
druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE));
druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW));
druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN));
druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE));
druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE));
druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE));
druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT));
druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE));
druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE));
druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS));
druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS));
druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS));
druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT));
//auto commit
druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT));
return druidDataSource;
}
......@@ -95,11 +108,12 @@ public class ConnectionFactory {
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:org/apache/dolphinscheduler/dao/mapper/*.xml"));
sqlSessionFactoryBean.setConfiguration(configuration);
return sqlSessionFactoryBean.getObject();
sqlSessionFactory = sqlSessionFactoryBean.getObject();
return sqlSessionFactory;
}
}
}
......
# mysql
# url=jdbc:postgresql://192.168.220.154:5432/dolphinscheduler
# base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
#spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.220.188:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root@123
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# driver-class-name=org.postgresql.Driver
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# connection configuration
spring.datasource.initialSize=5
# min connection number
spring.datasource.minIdle=5
spring.datasource.maxActive=20
# max connection number
spring.datasource.maxActive=50
# max wait time for get a connection in milliseconds. if configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases.
# If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true.
spring.datasource.maxWait=60000
# milliseconds for check to close free connections
spring.datasource.timeBetweenEvictionRunsMillis=60000
# the Destroy thread detects the connection interval and closes the physical connection in milliseconds if the connection idle time is greater than or equal to minEvictableIdleTimeMillis.
spring.datasource.timeBetweenConnectErrorMillis=60000
# the longest time a connection remains idle without being evicted, in milliseconds
spring.datasource.minEvictableIdleTimeMillis=300000
#the SQL used to check whether the connection is valid requires a query statement. If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work.
spring.datasource.validationQuery=SELECT 1 FROM DUAL
#check whether the connection is valid for timeout, in seconds
spring.datasource.validationQueryTimeout=3
# when applying for a connection, if it is detected that the connection is idle longer than time Between Eviction Runs Millis,
# validation Query is performed to check whether the connection is valid
spring.datasource.testWhileIdle=true
spring.datasource.testOnBorrow=false
#execute validation to check if the connection is valid when applying for a connection
spring.datasource.testOnBorrow=true
#execute validation to check if the connection is valid when the connection is returned
spring.datasource.testOnReturn=false
spring.datasource.defaultAutoCommit=true
spring.datasource.keepAlive=true
# open PSCache, specify count PSCache for every connection
spring.datasource.poolPreparedStatements=true
spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
spring.datasource.spring.datasource.filters=stat,wall,log4j
spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
......
......@@ -142,7 +142,7 @@ public class TaskScheduleThread implements Runnable {
task = TaskManager.newTask(taskInstance.getTaskType(),
taskProps,
taskLogger,processDao);
taskLogger);
// task init
task.init();
......
......@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
......@@ -42,7 +41,6 @@ public abstract class AbstractYarnTask extends AbstractTask {
/**
* process database access
*/
@Autowired
protected ProcessDao processDao;
/**
......@@ -50,9 +48,9 @@ public abstract class AbstractYarnTask extends AbstractTask {
* @param logger
* @throws IOException
*/
public AbstractYarnTask(TaskProps taskProps, Logger logger,ProcessDao processDao) {
public AbstractYarnTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.processDao = processDao;
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
taskProps.getTaskAppId(),
......
......@@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
......@@ -45,27 +44,27 @@ public class TaskManager {
* @return
* @throws IllegalArgumentException
*/
public static AbstractTask newTask(String taskType, TaskProps props, Logger logger,ProcessDao processDao)
public static AbstractTask newTask(String taskType, TaskProps props, Logger logger)
throws IllegalArgumentException {
switch (EnumUtils.getEnum(TaskType.class,taskType)) {
case SHELL:
return new ShellTask(props, logger,processDao);
return new ShellTask(props, logger);
case PROCEDURE:
return new ProcedureTask(props, logger,processDao);
return new ProcedureTask(props, logger);
case SQL:
return new SqlTask(props, logger,processDao);
return new SqlTask(props, logger);
case MR:
return new MapReduceTask(props, logger,processDao);
return new MapReduceTask(props, logger);
case SPARK:
return new SparkTask(props, logger,processDao);
return new SparkTask(props, logger);
case FLINK:
return new FlinkTask(props, logger,processDao);
return new FlinkTask(props, logger);
case PYTHON:
return new PythonTask(props, logger,processDao);
return new PythonTask(props, logger);
case DEPENDENT:
return new DependentTask(props, logger,processDao);
return new DependentTask(props, logger);
case HTTP:
return new HttpTask(props, logger,processDao);
return new HttpTask(props, logger);
default:
logger.error("unsupport task type: {}", taskType);
throw new IllegalArgumentException("not support task type");
......
......@@ -52,9 +52,8 @@ public class DependentTask extends AbstractTask {
private ProcessDao processDao;
public DependentTask(TaskProps props, Logger logger,ProcessDao processDao) {
public DependentTask(TaskProps props, Logger logger) {
super(props, logger);
this.processDao = processDao;
}
@Override
......@@ -69,7 +68,7 @@ public class DependentTask extends AbstractTask {
taskModel.getDependItemList(), taskModel.getRelation()));
}
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();
......
......@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
......@@ -50,8 +49,8 @@ public class FlinkTask extends AbstractYarnTask {
*/
private FlinkParameters flinkParameters;
public FlinkTask(TaskProps props, Logger logger,ProcessDao processDao) {
super(props, logger,processDao);
public FlinkTask(TaskProps props, Logger logger) {
super(props, logger);
}
@Override
......
......@@ -75,9 +75,9 @@ public class HttpTask extends AbstractTask {
protected String output;
public HttpTask(TaskProps props, Logger logger,ProcessDao processDao) {
public HttpTask(TaskProps props, Logger logger) {
super(props, logger);
this.processDao = processDao;
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@Override
......
......@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......@@ -49,8 +48,8 @@ public class MapReduceTask extends AbstractYarnTask {
* @param props
* @param logger
*/
public MapReduceTask(TaskProps props, Logger logger,ProcessDao processDao) {
super(props, logger,processDao);
public MapReduceTask(TaskProps props, Logger logger) {
super(props, logger);
}
@Override
......
......@@ -64,7 +64,7 @@ public class ProcedureTask extends AbstractTask {
*/
private BaseDataSource baseDataSource;
public ProcedureTask(TaskProps taskProps, Logger logger,ProcessDao processDao) {
public ProcedureTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
logger.info("procedure task params {}", taskProps.getTaskParams());
......@@ -76,7 +76,7 @@ public class ProcedureTask extends AbstractTask {
throw new RuntimeException("procedure task params is not valid");
}
this.processDao = processDao;
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@Override
......
......@@ -59,7 +59,7 @@ public class PythonTask extends AbstractTask {
private ProcessDao processDao;
public PythonTask(TaskProps taskProps, Logger logger,ProcessDao processDao) {
public PythonTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.taskDir = taskProps.getTaskDir();
......@@ -73,7 +73,7 @@ public class PythonTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = processDao;
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@Override
......
......@@ -61,7 +61,7 @@ public class ShellTask extends AbstractTask {
private ProcessDao processDao;
public ShellTask(TaskProps taskProps, Logger logger,ProcessDao processDao) {
public ShellTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.taskDir = taskProps.getTaskDir();
......@@ -74,7 +74,7 @@ public class ShellTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = processDao;
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@Override
......
......@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
......@@ -48,8 +47,8 @@ public class SparkTask extends AbstractYarnTask {
*/
private SparkParameters sparkParameters;
public SparkTask(TaskProps props, Logger logger,ProcessDao processDao) {
super(props, logger,processDao);
public SparkTask(TaskProps props, Logger logger) {
super(props, logger);
}
@Override
......
......@@ -47,7 +47,6 @@ import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import java.sql.*;
import java.util.*;
......@@ -75,7 +74,6 @@ public class SqlTask extends AbstractTask {
/**
* alert dao
*/
@Autowired
private AlertDao alertDao;
/**
......@@ -89,7 +87,7 @@ public class SqlTask extends AbstractTask {
private BaseDataSource baseDataSource;
public SqlTask(TaskProps taskProps, Logger logger,ProcessDao processDao) {
public SqlTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
logger.info("sql task params {}", taskProps.getTaskParams());
......@@ -98,8 +96,8 @@ public class SqlTask extends AbstractTask {
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
this.processDao = processDao;
// this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
this.alertDao = DaoFactory.getDaoInstance(AlertDao.class);
}
@Override
......
......@@ -79,7 +79,7 @@ public class ShellCommandExecutorTest {
taskInstance.getId()));
AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger,null);
AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
logger.info("task info : {}", task);
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.dao.DaoFactory;
......@@ -55,7 +56,7 @@ public class SqlExecutorTest {
String nodeName = "mysql sql test";
String taskAppId = "51_11282_263978";
String tenantCode = "hdfs";
int taskInstId = 263978;
int taskInstId = 7;
sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
}
......@@ -105,7 +106,7 @@ public class SqlExecutorTest {
taskProps.setTaskTimeout(360000);
taskProps.setTaskInstId(taskInstId);
taskProps.setNodeName(nodeName);
taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS);
TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
......@@ -122,7 +123,7 @@ public class SqlExecutorTest {
taskInstance.getId()));
AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger,null);
AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
logger.info("task info : {}", task);
......
......@@ -52,7 +52,7 @@ public class DependentTaskTest {
taskProps.setTaskInstId(252612);
taskProps.setDependence(dependString);
DependentTask dependentTask = new DependentTask(taskProps, logger,null);
DependentTask dependentTask = new DependentTask(taskProps, logger);
dependentTask.init();
dependentTask.handle();
Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_FAILURE );
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册