未验证 提交 0c6ffdbf 编写于 作者: _和's avatar _和 提交者: GitHub

[Feature][DataQuality] Add data quality module (#4830)

* add data quality module

* add license

* add package configuration in dist pom

* fix license and jar import bug

* replace apache/skywalking-eyes@9bd5feb SHA

* refacotr jbdc-connector and writer

* modify parameter name in HiveConnector

* fix checkstyle error

* fix checkstyle error in dolphinschesuler-dist

* fix checkstyle error in dolphinschesuler-dist

* fix checkstyle error in dolphinschesuler-dist

* fix duplicate code bug

* fix code style bug

* fix code smells

* update

* close e2e test

* fix spark rw postgresql bug

* change mysql driver scope

* add more sql driver

* remove hive-jdbc in pom.xml

* change the pom.xml
Co-authored-by: Nsunchaohe <sunzhaohe@linklogis.com>
上级 02373ad0
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-data-quality</artifactId>
<name>dolphinscheduler-data-quality</name>
<packaging>jar</packaging>
<properties>
<jackson.version>2.9.0</jackson.version>
<scope>provided</scope>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>${scope}</scope>
<exclusions>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>${scope}</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.apache.dolphinscheduler.data.quality.DataQualityApplication</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality;
/**
* Constants
*/
public final class Constants {
private Constants() {
throw new IllegalStateException("Construct Constants");
}
public static final String DATABASE = "database";
public static final String TABLE = "table";
public static final String URL = "url";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String DRIVER = "driver";
public static final String DEFAULT_DRIVER = "com.mysql.jdbc.Driver";
public static final String DEFAULT_DATABASE = "default";
public static final String EMPTY = "";
public static final String SQL = "sql";
public static final String DOTS = ".";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality;
import org.apache.dolphinscheduler.data.quality.configuration.DataQualityConfiguration;
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException;
import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
import org.apache.dolphinscheduler.data.quality.flow.connector.ConnectorFactory;
import org.apache.dolphinscheduler.data.quality.flow.executor.SparkSqlExecuteTask;
import org.apache.dolphinscheduler.data.quality.flow.writer.WriterFactory;
import org.apache.dolphinscheduler.data.quality.utils.JsonUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataQualityApplication
*/
public class DataQualityApplication {
private static final Logger logger = LoggerFactory.getLogger(DataQualityApplication.class);
public static void main(String[] args) throws Exception {
if (args.length < 1) {
logger.error("Can not find DataQualityConfiguration");
System.exit(-1);
}
String dataQualityParameter = args[0];
DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(dataQualityParameter,DataQualityConfiguration.class);
if (dataQualityConfiguration == null) {
logger.info("DataQualityConfiguration is null");
System.exit(-1);
} else {
dataQualityConfiguration.validate();
}
SparkConf conf = new SparkConf().setAppName(dataQualityConfiguration.getName());
conf.set("spark.sql.crossJoin.enabled", "true");
SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
DataQualityContext context = new DataQualityContext(
sparkSession,
dataQualityConfiguration.getConnectorParameters(),
dataQualityConfiguration.getExecutorParameters(),
dataQualityConfiguration.getWriterParams());
execute(buildDataQualityFlow(context));
sparkSession.stop();
}
private static List<DataQualityTask> buildDataQualityFlow(DataQualityContext context) throws DataQualityException {
List<DataQualityTask> taskList =
new ArrayList<>(ConnectorFactory.getInstance().getConnectors(context));
taskList.add(new SparkSqlExecuteTask(context.getSparkSession(),context.getExecutorParameterList()));
taskList.addAll(WriterFactory.getInstance().getWriters(context));
return taskList;
}
private static void execute(List<DataQualityTask> taskList) {
for (DataQualityTask task: taskList) {
task.execute();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.configuration;
import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* ConnectorParameter
*/
public class ConnectorParameter implements IParameter {
@JsonProperty("type")
private String type;
@JsonProperty("config")
private Map<String,Object> config;
public ConnectorParameter(){
}
public ConnectorParameter(String type, Map<String,Object> config) {
this.type = type;
this.config = config;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Map<String, Object> getConfig() {
return config;
}
public void setConfig(Map<String, Object> config) {
this.config = config;
}
@Override
public void validate() {
Preconditions.checkArgument(StringUtils.isNotEmpty(type), "type should not be empty");
Preconditions.checkArgument(config != null, "config should not be empty");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.configuration;
import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* DataQualityConfiguration
*/
public class DataQualityConfiguration implements IParameter {
@JsonProperty("name")
private String name;
@JsonProperty("connectors")
private List<ConnectorParameter> connectorParameters;
@JsonProperty("writers")
private List<WriterParameter> writerParams;
@JsonProperty("executors")
private List<ExecutorParameter> executorParameters;
public DataQualityConfiguration(){}
public DataQualityConfiguration(String name,
List<ConnectorParameter> connectorParameters,
List<WriterParameter> writerParams,
List<ExecutorParameter> executorParameters) {
this.name = name;
this.connectorParameters = connectorParameters;
this.writerParams = writerParams;
this.executorParameters = executorParameters;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<ConnectorParameter> getConnectorParameters() {
return connectorParameters;
}
public void setConnectorParameters(List<ConnectorParameter> connectorParameters) {
this.connectorParameters = connectorParameters;
}
public List<WriterParameter> getWriterParams() {
return writerParams;
}
public void setWriterParams(List<WriterParameter> writerParams) {
this.writerParams = writerParams;
}
public List<ExecutorParameter> getExecutorParameters() {
return executorParameters;
}
public void setExecutorParameters(List<ExecutorParameter> executorParameters) {
this.executorParameters = executorParameters;
}
@Override
public void validate() {
Preconditions.checkArgument(StringUtils.isNotEmpty(name), "name should not be empty");
Preconditions.checkArgument(connectorParameters != null, "connector param should not be empty");
for (ConnectorParameter connectorParameter:connectorParameters) {
connectorParameter.validate();
}
Preconditions.checkArgument(writerParams != null, "writer param should not be empty");
for (WriterParameter writerParameter:writerParams) {
writerParameter.validate();
}
Preconditions.checkArgument(executorParameters != null, "executor param should not be empty");
for (ExecutorParameter executorParameter:executorParameters) {
executorParameter.validate();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.configuration;
import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* ExecutorParameter
*/
public class ExecutorParameter implements IParameter {
@JsonProperty("index")
private String index;
@JsonProperty("execute.sql")
private String executeSql;
@JsonProperty("table.alias")
private String tableAlias;
public ExecutorParameter() {
}
public ExecutorParameter(String index, String executeSql, String tableAlias) {
this.index = index;
this.executeSql = executeSql;
this.tableAlias = tableAlias;
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getExecuteSql() {
return executeSql;
}
public void setExecuteSql(String executeSql) {
this.executeSql = executeSql;
}
public String getTableAlias() {
return tableAlias;
}
public void setTableAlias(String tableAlias) {
this.tableAlias = tableAlias;
}
@Override
public void validate() {
Preconditions.checkArgument(index != null, "index should not be empty");
Preconditions.checkArgument(executeSql != null, "executeSql should not be empty");
Preconditions.checkArgument(tableAlias != null, "tableAlias should not be empty");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.configuration;
/**
* IParameter
*/
public interface IParameter {
/**
* check the parameter
*/
void validate();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.configuration;
import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* WriterParameter
*/
public class WriterParameter implements IParameter {
@JsonProperty("type")
private String type;
@JsonProperty("config")
private Map<String,Object> config;
public WriterParameter() {
}
public WriterParameter(String type, Map<String,Object> config) {
this.type = type;
this.config = config;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Map<String, Object> getConfig() {
return config;
}
public void setConfig(Map<String, Object> config) {
this.config = config;
}
@Override
public void validate() {
Preconditions.checkArgument(StringUtils.isNotEmpty(type), "type should not be empty");
Preconditions.checkArgument(config != null, "config should not be empty");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.context;
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
import org.apache.dolphinscheduler.data.quality.configuration.ExecutorParameter;
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
import org.apache.spark.sql.SparkSession;
import java.util.List;
/**
* DataQualityContext
*/
public class DataQualityContext {
private SparkSession sparkSession;
private List<ConnectorParameter> connectorParameterList;
private List<ExecutorParameter> executorParameterList;
private List<WriterParameter> writerParamList;
public DataQualityContext() {
}
public DataQualityContext(SparkSession sparkSession,
List<ConnectorParameter> connectorParameterList,
List<ExecutorParameter> executorParameterList,
List<WriterParameter> writerParamList) {
this.sparkSession = sparkSession;
this.connectorParameterList = connectorParameterList;
this.executorParameterList = executorParameterList;
this.writerParamList = writerParamList;
}
public SparkSession getSparkSession() {
return sparkSession;
}
public void setSparkSession(SparkSession sparkSession) {
this.sparkSession = sparkSession;
}
public List<ConnectorParameter> getConnectorParameterList() {
return connectorParameterList;
}
public void setConnectorParameterList(List<ConnectorParameter> connectorParameterList) {
this.connectorParameterList = connectorParameterList;
}
public List<ExecutorParameter> getExecutorParameterList() {
return executorParameterList;
}
public void setExecutorParameterList(List<ExecutorParameter> executorParameterList) {
this.executorParameterList = executorParameterList;
}
public List<WriterParameter> getWriterParamList() {
return writerParamList;
}
public void setWriterParamList(List<WriterParameter> writerParamList) {
this.writerParamList = writerParamList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.enums;
/**
* ConnectorType
*/
public enum ConnectorType {
/**
* JDBC
* HIVE
*/
JDBC,
HIVE;
public static ConnectorType getType(String name) {
for (ConnectorType type: ConnectorType.values()) {
if (type.name().equalsIgnoreCase(name)) {
return type;
}
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.enums;
/**
* WriterType
*/
public enum WriterType {
/**
* JDBC
*/
JDBC;
public static WriterType getType(String name) {
for (WriterType type: WriterType.values()) {
if (type.name().equalsIgnoreCase(name)) {
return type;
}
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.exception;
/**
* data quality exception
*/
public class DataQualityException extends Exception {
public DataQualityException() {
super();
}
/**
* Construct a new runtime exception with the detail message
*
* @param message detail message
*/
public DataQualityException(String message) {
super(message);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param message the detail message
* @param cause the cause
* @since 1.4
*/
public DataQualityException(String message, Throwable cause) {
super(message, cause);
}
/**
* Construct a new runtime exception with throwable
*
* @param cause the cause
*/
public DataQualityException(Throwable cause) {
super(cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow;
/**
* DataQualityTask
*/
public interface DataQualityTask {
/**
* execute
*/
void execute();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow;
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DRIVER;
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY;
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
import static org.apache.dolphinscheduler.data.quality.Constants.URL;
import static org.apache.dolphinscheduler.data.quality.Constants.USER;
import org.apache.dolphinscheduler.data.quality.Constants;
import java.util.Map;
/**
* JdbcBaseConfig
*/
public class JdbcBaseConfig {
private String database;
private String table;
private String dbTable;
private String url;
private String user;
private String password;
private String driver;
public JdbcBaseConfig(Map<String,Object> config) {
database = String.valueOf(config.getOrDefault(DATABASE,DEFAULT_DATABASE));
table = String.valueOf(config.getOrDefault(TABLE,EMPTY));
dbTable = database + Constants.DOTS + table;
url = String.valueOf(config.getOrDefault(URL,EMPTY));
user = String.valueOf(config.getOrDefault(USER,EMPTY));
password = String.valueOf(config.getOrDefault(PASSWORD,EMPTY));
driver = String.valueOf(config.getOrDefault(DRIVER,DEFAULT_DRIVER));
}
public String getDatabase() {
return database;
}
public String getTable() {
return table;
}
public String getDbTable() {
return dbTable;
}
public String getUrl() {
return url;
}
public String getUser() {
return user;
}
public String getPassword() {
return password;
}
public String getDriver() {
return driver;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.connector;
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
import org.apache.dolphinscheduler.data.quality.enums.ConnectorType;
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
/**
* ConnectorFactory
*/
public class ConnectorFactory {
private static class Singleton {
static ConnectorFactory instance = new ConnectorFactory();
}
public static ConnectorFactory getInstance() {
return Singleton.instance;
}
public List<IConnector> getConnectors(DataQualityContext context) throws DataQualityException {
List<IConnector> connectorList = new ArrayList<>();
for (ConnectorParameter connectorParameter :context.getConnectorParameterList()) {
IConnector connector = getConnector(context.getSparkSession(), connectorParameter);
if (connector != null) {
connectorList.add(connector);
}
}
return connectorList;
}
private IConnector getConnector(SparkSession sparkSession,ConnectorParameter connectorParameter) throws DataQualityException {
ConnectorType connectorType = ConnectorType.getType(connectorParameter.getType());
if (connectorType != null) {
switch (connectorType) {
case HIVE:
return new HiveConnector(sparkSession, connectorParameter);
case JDBC:
return new JdbcConnector(sparkSession, connectorParameter);
default:
throw new DataQualityException("connector type ${connectorType} is not supported!");
}
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.connector;
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY;
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
import org.apache.dolphinscheduler.data.quality.Constants;
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
import org.apache.spark.sql.SparkSession;
import java.util.Map;
/**
* HiveConnector
*/
public class HiveConnector implements IConnector {
private final SparkSession sparkSession;
private final ConnectorParameter connectorParameter;
public HiveConnector(SparkSession sparkSession, ConnectorParameter connectorParameter) {
this.sparkSession = sparkSession;
this.connectorParameter = connectorParameter;
}
@Override
public void execute() {
Map<String,Object> config = connectorParameter.getConfig();
String database = String.valueOf(config.getOrDefault(DATABASE,DEFAULT_DATABASE));
String table = String.valueOf(config.getOrDefault(TABLE,EMPTY));
String dbTable = database + Constants.DOTS + table;
sparkSession.table(dbTable).createOrReplaceTempView(table);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.connector;
import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
/**
* IConnector
*/
public interface IConnector extends DataQualityTask {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.connector;
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
import org.apache.dolphinscheduler.data.quality.flow.JdbcBaseConfig;
import org.apache.dolphinscheduler.data.quality.utils.JdbcUtils;
import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
import org.apache.spark.sql.SparkSession;
import java.util.Map;
/**
* JdbcConnector
*/
public class JdbcConnector implements IConnector {
private final SparkSession sparkSession;
private final ConnectorParameter connectorParameter;
public JdbcConnector(SparkSession sparkSession, ConnectorParameter connectorParameter) {
this.sparkSession = sparkSession;
this.connectorParameter = connectorParameter;
}
@Override
public void execute() {
Map<String,Object> config = connectorParameter.getConfig();
JdbcBaseConfig jdbcBaseConfig = new JdbcBaseConfig(config);
Preconditions.checkArgument(JdbcUtils.isJdbcDriverLoaded(jdbcBaseConfig.getDriver()), "JDBC driver $driver not present in classpath");
sparkSession
.read()
.format("jdbc")
.option("driver",jdbcBaseConfig.getDriver())
.option("url",jdbcBaseConfig.getUrl())
.option("dbtable", jdbcBaseConfig.getDbTable())
.option("user", jdbcBaseConfig.getUser())
.option("password", jdbcBaseConfig.getPassword())
.load().createOrReplaceTempView(jdbcBaseConfig.getTable());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.executor;
import org.apache.dolphinscheduler.data.quality.configuration.ExecutorParameter;
import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
import org.apache.spark.sql.SparkSession;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SparkSqlExecuteTask
*/
public class SparkSqlExecuteTask implements DataQualityTask {
private static final Logger logger = LoggerFactory.getLogger(SparkSqlExecuteTask.class);
private final SparkSession sparkSession;
private final List<ExecutorParameter> executorParameterList;
public SparkSqlExecuteTask(SparkSession sparkSession,List<ExecutorParameter> executorParameterList) {
this.sparkSession = sparkSession;
this.executorParameterList = executorParameterList;
}
@Override
public void execute() {
for (ExecutorParameter executorParameter : executorParameterList) {
if (StringUtils.isNotEmpty(executorParameter.getTableAlias())) {
sparkSession
.sql(executorParameter.getExecuteSql())
.createOrReplaceTempView(executorParameter.getTableAlias());
} else {
logger.error("lost table alias");
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.writer;
import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
/**
* IWriter
*/
public interface IWriter extends DataQualityTask {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.writer;
import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY;
import static org.apache.dolphinscheduler.data.quality.Constants.SQL;
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
import org.apache.dolphinscheduler.data.quality.flow.JdbcBaseConfig;
import org.apache.dolphinscheduler.data.quality.utils.JdbcUtils;
import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Map;
/**
* JdbcWriter
*/
public class JdbcWriter implements IWriter {
private final SparkSession sparkSession;
private final WriterParameter writerParam;
public JdbcWriter(SparkSession sparkSession, WriterParameter writerParam) {
this.sparkSession = sparkSession;
this.writerParam = writerParam;
}
@Override
public void execute() {
Map<String,Object> config = writerParam.getConfig();
JdbcBaseConfig jdbcBaseConfig = new JdbcBaseConfig(config);
String sql = String.valueOf(config.getOrDefault(SQL,EMPTY));
Preconditions.checkArgument(JdbcUtils.isJdbcDriverLoaded(jdbcBaseConfig.getDriver()), "JDBC driver $driver not present in classpath");
sparkSession.sql(sql)
.write()
.format("jdbc")
.option("driver",jdbcBaseConfig.getDriver())
.option("url",jdbcBaseConfig.getUrl())
.option("dbtable", jdbcBaseConfig.getTable())
.option("user", jdbcBaseConfig.getUser())
.option("password", jdbcBaseConfig.getPassword())
.mode(SaveMode.Append)
.save();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.writer;
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
import org.apache.dolphinscheduler.data.quality.enums.WriterType;
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
/**
* WriterFactory
*/
public class WriterFactory {
private static class Singleton {
static WriterFactory instance = new WriterFactory();
}
public static WriterFactory getInstance() {
return Singleton.instance;
}
public List<IWriter> getWriters(DataQualityContext context) throws DataQualityException {
List<IWriter> writerList = new ArrayList<>();
for (WriterParameter writerParam:context.getWriterParamList()) {
IWriter writer = getWriter(context.getSparkSession(),writerParam);
if (writer != null) {
writerList.add(writer);
}
}
return writerList;
}
private IWriter getWriter(SparkSession sparkSession,WriterParameter writerParam) throws DataQualityException {
WriterType writerType = WriterType.getType(writerParam.getType());
if (writerType != null) {
if (writerType == WriterType.JDBC) {
return new JdbcWriter(sparkSession, writerParam);
}
throw new DataQualityException("writer type $readerType is not supported!");
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* JdbcUtil
*/
public class JdbcUtils {
private static final Logger logger = LoggerFactory.getLogger(JdbcUtils.class);
private JdbcUtils() {
throw new UnsupportedOperationException("Construct JdbcUtils");
}
public static boolean isJdbcDriverLoaded(String driver) {
try {
Class.forName(driver);
return true;
} catch (ClassNotFoundException e) {
logger.error("JDBC driver $driver provided is not found in class path", e);
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.utils;
import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* JsonUtil
*/
public class JsonUtils {
private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class);
/**
* can use static singleton, inject: just make sure to reuse!
*/
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setTimeZone(TimeZone.getDefault());
private JsonUtils() {
throw new UnsupportedOperationException("Construct JSONUtils");
}
public static String toJson(Object object) {
try {
return MAPPER.writeValueAsString(object);
} catch (Exception e) {
logger.error("object to json exception!", e);
}
return null;
}
public static <T> T fromJson(String json, Class<T> clazz) {
if (StringUtils.isEmpty(json)) {
return null;
}
try {
return MAPPER.readValue(json, clazz);
} catch (Exception e) {
logger.error("parse object exception!", e);
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.utils;
/**
* utility methods for validating input
*/
public final class Preconditions {
private Preconditions() {
throw new UnsupportedOperationException("Construct Preconditions");
}
/**
* if condition is false will throw an IllegalArgumentException with the given message
*
* @param condition condition
* @param errorMsg error message
* @throws IllegalArgumentException Thrown, if the condition is violated.
*/
public static void checkArgument(boolean condition, Object errorMsg) {
if (!condition) {
throw new IllegalArgumentException(String.valueOf(errorMsg));
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.utils;
/**
* StringUtils
*/
public class StringUtils {
public static final String EMPTY = "";
private StringUtils() {
throw new IllegalStateException("Construct StringUtils");
}
public static boolean isEmpty(final CharSequence cs) {
return cs == null || cs.length() == 0;
}
public static boolean isNotEmpty(final CharSequence cs) {
return !isEmpty(cs);
}
public static boolean isBlank(String s) {
if (isEmpty(s)) {
return true;
}
return s.trim().length() == 0;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.Before;
/**
* SparkApplicationTestBase
*/
public class SparkApplicationTestBase {
protected SparkSession sparkSession;
@Before
public void before() {
SparkConf conf = new SparkConf().setAppName("data quality test");
conf.set("spark.sql.crossJoin.enabled", "true");
sparkSession = SparkSession.builder()
.master("local[4]")
.config(conf)
.getOrCreate();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.configuration;
import org.apache.dolphinscheduler.data.quality.utils.JsonUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* ConfigurationParserTest
*/
public class ConfigurationParserTest {
@Test
public void testConfigurationValidate() {
Assert.assertEquals(1,verifyConfigurationValidate());
}
private int verifyConfigurationValidate() {
int flag = 1;
try {
String parameterStr = "{\n"
+ "\t\"name\": \"\\u81EA\\u5B9A\\u4E49SQL\",\n"
+ "\t\"connectors\": [{\n"
+ "\t\t\"type\": \"JDBC\",\n"
+ "\t\t\"config\": {\n"
+ "\t\t\t\"database\": \"test\",\n"
+ "\t\t\t\"password\": \"123456\",\n"
+ "\t\t\t\"driver\": \"com.mysql.jdbc.Driver\",\n"
+ "\t\t\t\"user\": \"test\",\n"
+ "\t\t\t\"table\": \"test1\",\n"
+ "\t\t\t\"url\": \"jdbc:mysql://localhost:3306/test\"\n"
+ "\t\t}\n"
+ "\t}],\n"
+ "\t\"writers\": [{\n"
+ "\t\t\"type\": \"JDBC\",\n"
+ "\t\t\"config\": {\n"
+ "\t\t\t\"database\": \"dolphinscheduler\",\n"
+ "\t\t\t\"password\": \"Test@123!\",\n"
+ "\t\t\t\"driver\": \"com.mysql.jdbc.Driver\",\n"
+ "\t\t\t\"user\": \"test\",\n"
+ "\t\t\t\"table\": \"t_ds_dqs_result\",\n"
+ "\t\t\t\"url\": \"jdbc:mysql://localhost:3306/dolphinscheduler?characterEncoding=UTF-8&allowMultiQueries=true\",\n"
+ "\t\t\t\"sql\": \"SELECT 1 as rule_type,"
+ "'\\u81EA\\u5B9A\\u4E49SQL' as rule_name,"
+ "18 as process_definition_id,"
+ "64 as process_instance_id,"
+ "70 as task_instance_id,"
+ "mySum AS statistics_value, "
+ "total_count.total AS comparison_value,"
+ "0 as check_type,"
+ "6 as threshold, "
+ "0 as operator, "
+ "0 as failure_strategy, "
+ "'2021-01-31 15:00:07' as create_time,"
+ "'2021-01-31 15:00:07' as update_time from ( select sum(c4) as mySum from test1 ) tmp1 join total_count\"\n"
+ "\t\t}\n"
+ "\t}],\n"
+ "\t\"executors\": [{\n"
+ "\t\t\"index\": \"1\",\n"
+ "\t\t\"execute.sql\": \"SELECT COUNT(*) AS total FROM test1 WHERE (c3 != '55')\",\n"
+ "\t\t\"table.alias\": \"total_count\"\n"
+ "\t}]\n"
+ "}";
DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(parameterStr,DataQualityConfiguration.class);
dataQualityConfiguration.validate();
} catch (Exception e) {
flag = 0;
e.printStackTrace();
}
return flag;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow;
import org.apache.dolphinscheduler.data.quality.SparkApplicationTestBase;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
/**
* FlowTestBase
*/
public class FlowTestBase extends SparkApplicationTestBase {
protected String url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1";
protected String driver = "org.h2.Driver";
protected Connection getConnection() throws Exception {
Properties properties = new Properties();
properties.setProperty("user", "test");
properties.setProperty("password", "123456");
properties.setProperty("rowId", "false");
DriverManager.registerDriver(new org.h2.Driver());
Class.forName(driver, false, this.getClass().getClassLoader());
return DriverManager.getConnection(url, properties);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.connector;
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
import static org.apache.dolphinscheduler.data.quality.Constants.URL;
import static org.apache.dolphinscheduler.data.quality.Constants.USER;
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
/**
* ConnectorFactoryTest
*/
public class ConnectorFactoryTest {
@Test
public void testConnectorGenerate() {
DataQualityContext context = new DataQualityContext();
List<ConnectorParameter> connectorParameters = new ArrayList<>();
ConnectorParameter connectorParameter = new ConnectorParameter();
connectorParameter.setType("JDBC");
Map<String,Object> config = new HashMap<>();
config.put(DATABASE,"test");
config.put(TABLE,"test1");
config.put(URL,"jdbc:mysql://localhost:3306/test");
config.put(USER,"test");
config.put(PASSWORD,"123456");
config.put(DRIVER,"com.mysql.jdbc.Driver");
connectorParameter.setConfig(config);
connectorParameter.setConfig(null);
connectorParameters.add(connectorParameter);
context.setConnectorParameterList(connectorParameters);
int flag = 0;
try {
List<IConnector> connectors = ConnectorFactory.getInstance().getConnectors(context);
if (connectors != null && connectors.size() >= 1) {
flag = 1;
}
} catch (Exception e) {
e.printStackTrace();
}
Assert.assertEquals(1,flag);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.connector;
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
import static org.apache.dolphinscheduler.data.quality.Constants.URL;
import static org.apache.dolphinscheduler.data.quality.Constants.USER;
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
import org.apache.dolphinscheduler.data.quality.flow.FlowTestBase;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
/**
* JdbcConnectorTest
*/
public class JdbcConnectorTest extends FlowTestBase {
@Before
public void before() {
super.before();
createConnectorTable();
}
@Test
public void testJdbcConnectorExecute() {
JdbcConnector jdbcConnector = new JdbcConnector(sparkSession,buildConnectorParameter());
jdbcConnector.execute();
}
private ConnectorParameter buildConnectorParameter() {
ConnectorParameter connectorParameter = new ConnectorParameter();
connectorParameter.setType("JDBC");
Map<String,Object> config = new HashMap<>();
config.put(DATABASE,"test");
config.put(TABLE,"test1");
config.put(URL,url);
config.put(USER,"test");
config.put(PASSWORD,"123456");
config.put(DRIVER,driver);
connectorParameter.setConfig(config);
return connectorParameter;
}
private void createConnectorTable() {
try {
Connection connection = getConnection();
connection.prepareStatement("create schema if not exists test").executeUpdate();
connection.prepareStatement("drop table if exists test.test1").executeUpdate();
connection
.prepareStatement(
"CREATE TABLE test.test1 (\n"
+ " `id` int(11) NOT NULL AUTO_INCREMENT,\n"
+ " `company` varchar(255) DEFAULT NULL,\n"
+ " `date` varchar(255) DEFAULT NULL,\n"
+ " `c1` varchar(255) DEFAULT NULL,\n"
+ " `c2` varchar(255) DEFAULT NULL,\n"
+ " `c3` varchar(255) DEFAULT NULL,\n"
+ " `c4` int(11) DEFAULT NULL,\n"
+ " PRIMARY KEY (`id`)\n"
+ ")")
.executeUpdate();
connection.prepareStatement("INSERT INTO test.test1 (company,`date`,c1,c2,c3,c4) VALUES\n"
+ "\t ('1','2019-03-01','11','12','13',1),\n"
+ "\t ('2','2019-06-01','21','22','23',1),\n"
+ "\t ('3','2019-09-01','31','32','33',1),\n"
+ "\t ('4','2019-12-01','41','42','43',1),\n"
+ "\t ('5','2013','42','43','54',1),\n"
+ "\t ('6','2020','42','43','54',1);").executeUpdate();
connection.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.writer;
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
import static org.apache.dolphinscheduler.data.quality.Constants.URL;
import static org.apache.dolphinscheduler.data.quality.Constants.USER;
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
import org.apache.dolphinscheduler.data.quality.flow.FlowTestBase;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
/**
* JdbcWriterTest
*/
public class JdbcWriterTest extends FlowTestBase {
@Before
public void before() {
super.before();
createWriterTable();
}
@Test
public void testJdbcWriterExecute() {
JdbcWriter jdbcWriter = new JdbcWriter(sparkSession,buildWriterParameter());
jdbcWriter.execute();
}
private WriterParameter buildWriterParameter() {
WriterParameter writerParameter = new WriterParameter();
writerParameter.setType("JDBC");
Map<String,Object> config = new HashMap<>();
config.put(DATABASE,"test");
config.put(TABLE,"test2");
config.put(URL,url);
config.put(USER,"test");
config.put(PASSWORD,"123456");
config.put(DRIVER,driver);
config.put("sql","SELECT '1' as company,'1' as date,'2' as c1,'2' as c2,'2' as c3, 2 as c4");
writerParameter.setConfig(config);
return writerParameter;
}
private void createWriterTable() {
try {
Connection connection = getConnection();
connection.prepareStatement("create schema if not exists test").executeUpdate();
connection.prepareStatement("drop table if exists test.test2").executeUpdate();
connection
.prepareStatement(
"CREATE TABLE test.test2 (\n"
+ " `id` int(11) NOT NULL AUTO_INCREMENT,\n"
+ " `company` varchar(255) DEFAULT NULL,\n"
+ " `date` varchar(255) DEFAULT NULL,\n"
+ " `c1` varchar(255) DEFAULT NULL,\n"
+ " `c2` varchar(255) DEFAULT NULL,\n"
+ " `c3` varchar(255) DEFAULT NULL,\n"
+ " `c4` int(11) DEFAULT NULL,\n"
+ " PRIMARY KEY (`id`)\n"
+ ")")
.executeUpdate();
connection.prepareStatement("set schema test").executeUpdate();
connection.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.flow.writer;
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
/**
* WriterFactoryTest
*/
public class WriterFactoryTest {
@Test
public void testWriterGenerate() {
DataQualityContext context = new DataQualityContext();
List<WriterParameter> writerParameters = new ArrayList<>();
WriterParameter writerParameter = new WriterParameter();
writerParameter.setType("JDBC");
writerParameter.setConfig(null);
writerParameters.add(writerParameter);
context.setWriterParamList(writerParameters);
int flag = 0;
try {
List<IWriter> writers = WriterFactory.getInstance().getWriters(context);
if (writers != null && writers.size() >= 1) {
flag = 1;
}
} catch (Exception e) {
e.printStackTrace();
}
Assert.assertEquals(1,flag);
}
}
......@@ -41,6 +41,11 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-data-quality</artifactId>
</dependency>
</dependencies>
<profiles>
......
......@@ -151,6 +151,14 @@
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-data-quality/target/dolphinscheduler-data-quality-${project.version}</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-ui/dist</directory>
<includes>
......
......@@ -151,6 +151,14 @@
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-data-quality/target/dolphinscheduler-data-quality-${project.version}</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-ui/dist</directory>
<includes>
......
......@@ -120,6 +120,8 @@
<guava-retry.version>2.0.0</guava-retry.version>
<dep.airlift.version>0.184</dep.airlift.version>
<dep.packaging.version>${dep.airlift.version}</dep.packaging.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.0</spark.version>
</properties>
<dependencyManagement>
......@@ -241,6 +243,11 @@
<artifactId>dolphinscheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-data-quality</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
......@@ -355,11 +362,13 @@
<version>${mysql.connector.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......@@ -592,6 +601,34 @@
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
......@@ -987,6 +1024,11 @@
<include>**/alert/processor/AlertRequestProcessorTest.java</include>
<include>**/alert/runner/AlertSenderTest.java</include>
<include>**/alert/AlertServerTest.java</include>
<include>**/data/quality/configuration/ConfigurationParserTest.java</include>
<include>**/data/quality/flow/connector/ConnectorFactoryTest.java</include>
<include>**/data/quality/flow/connector/JdbcConnectorTest.java</include>
<include>**/data/quality/flow/writer/WriterFactoryTest.java</include>
<include>**/data/quality/flow/writer/JdbcWriterTest.java</include>
</includes>
<!-- <skip>true</skip> -->
</configuration>
......@@ -1105,5 +1147,6 @@
<module>dolphinscheduler-service</module>
<module>dolphinscheduler-spi</module>
<module>dolphinscheduler-microbench</module>
<module>dolphinscheduler-data-quality</module>
</modules>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册