diff --git a/dolphinscheduler-data-quality/pom.xml b/dolphinscheduler-data-quality/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..17aed4e10b0a77880374327a6090644e1d8c31ba
--- /dev/null
+++ b/dolphinscheduler-data-quality/pom.xml
@@ -0,0 +1,140 @@
+
+
+
+
+ dolphinscheduler
+ org.apache.dolphinscheduler
+ 1.3.4-SNAPSHOT
+
+ 4.0.0
+ dolphinscheduler-data-quality
+ dolphinscheduler-data-quality
+
+ jar
+
+
+ 2.9.0
+ provided
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${scope}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${scope}
+
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${scope}
+
+
+ commons-httpclient
+ commons-httpclient
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+ ${scope}
+
+
+
+ com.h2database
+ h2
+ test
+
+
+
+ junit
+ junit
+ test
+
+
+
+ mysql
+ mysql-connector-java
+
+
+
+ org.postgresql
+ postgresql
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 2.2
+
+ false
+
+ jar-with-dependencies
+
+
+
+ org.apache.dolphinscheduler.data.quality.DataQualityApplication
+
+
+
+
+
+ make-assembly
+ package
+
+ assembly
+
+
+
+
+
+ maven-source-plugin
+ 2.1
+
+ true
+
+
+
+ compile
+
+ jar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/Constants.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/Constants.java
new file mode 100644
index 0000000000000000000000000000000000000000..224ec9a8e1ec3c36e0a5f149f7573c2ac54093f5
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/Constants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality;
+
+/**
+ * Constants
+ */
+public final class Constants {
+
+ private Constants() {
+ throw new IllegalStateException("Construct Constants");
+ }
+
+ public static final String DATABASE = "database";
+
+ public static final String TABLE = "table";
+
+ public static final String URL = "url";
+
+ public static final String USER = "user";
+
+ public static final String PASSWORD = "password";
+
+ public static final String DRIVER = "driver";
+
+ public static final String DEFAULT_DRIVER = "com.mysql.jdbc.Driver";
+
+ public static final String DEFAULT_DATABASE = "default";
+
+ public static final String EMPTY = "";
+
+ public static final String SQL = "sql";
+
+ public static final String DOTS = ".";
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java
new file mode 100644
index 0000000000000000000000000000000000000000..6d445b781d1a0123a5e8ad70aae2ca775e0796ef
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/DataQualityApplication.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality;
+
+import org.apache.dolphinscheduler.data.quality.configuration.DataQualityConfiguration;
+import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
+import org.apache.dolphinscheduler.data.quality.exception.DataQualityException;
+import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
+import org.apache.dolphinscheduler.data.quality.flow.connector.ConnectorFactory;
+import org.apache.dolphinscheduler.data.quality.flow.executor.SparkSqlExecuteTask;
+import org.apache.dolphinscheduler.data.quality.flow.writer.WriterFactory;
+import org.apache.dolphinscheduler.data.quality.utils.JsonUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataQualityApplication
+ */
+public class DataQualityApplication {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataQualityApplication.class);
+
+ public static void main(String[] args) throws Exception {
+
+ if (args.length < 1) {
+ logger.error("Can not find DataQualityConfiguration");
+ System.exit(-1);
+ }
+
+ String dataQualityParameter = args[0];
+
+ DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(dataQualityParameter,DataQualityConfiguration.class);
+ if (dataQualityConfiguration == null) {
+ logger.info("DataQualityConfiguration is null");
+ System.exit(-1);
+ } else {
+ dataQualityConfiguration.validate();
+ }
+
+ SparkConf conf = new SparkConf().setAppName(dataQualityConfiguration.getName());
+ conf.set("spark.sql.crossJoin.enabled", "true");
+ SparkSession sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
+
+ DataQualityContext context = new DataQualityContext(
+ sparkSession,
+ dataQualityConfiguration.getConnectorParameters(),
+ dataQualityConfiguration.getExecutorParameters(),
+ dataQualityConfiguration.getWriterParams());
+
+ execute(buildDataQualityFlow(context));
+ sparkSession.stop();
+ }
+
+ private static List buildDataQualityFlow(DataQualityContext context) throws DataQualityException {
+ List taskList =
+ new ArrayList<>(ConnectorFactory.getInstance().getConnectors(context));
+ taskList.add(new SparkSqlExecuteTask(context.getSparkSession(),context.getExecutorParameterList()));
+ taskList.addAll(WriterFactory.getInstance().getWriters(context));
+
+ return taskList;
+ }
+
+ private static void execute(List taskList) {
+ for (DataQualityTask task: taskList) {
+ task.execute();
+ }
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/ConnectorParameter.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/ConnectorParameter.java
new file mode 100644
index 0000000000000000000000000000000000000000..81c2d8e1a3c89e0d800d7fffbb983c3ba7ccd235
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/ConnectorParameter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.configuration;
+
+import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
+import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * ConnectorParameter
+ */
+public class ConnectorParameter implements IParameter {
+
+ @JsonProperty("type")
+ private String type;
+
+ @JsonProperty("config")
+ private Map config;
+
+ public ConnectorParameter(){
+ }
+
+ public ConnectorParameter(String type, Map config) {
+ this.type = type;
+ this.config = config;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map config) {
+ this.config = config;
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(type), "type should not be empty");
+ Preconditions.checkArgument(config != null, "config should not be empty");
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/DataQualityConfiguration.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/DataQualityConfiguration.java
new file mode 100644
index 0000000000000000000000000000000000000000..72beac0817c039d56f046a7837cfb9857e6b7e00
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/DataQualityConfiguration.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.configuration;
+
+import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
+import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * DataQualityConfiguration
+ */
+public class DataQualityConfiguration implements IParameter {
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("connectors")
+ private List connectorParameters;
+
+ @JsonProperty("writers")
+ private List writerParams;
+
+ @JsonProperty("executors")
+ private List executorParameters;
+
+ public DataQualityConfiguration(){}
+
+ public DataQualityConfiguration(String name,
+ List connectorParameters,
+ List writerParams,
+ List executorParameters) {
+ this.name = name;
+ this.connectorParameters = connectorParameters;
+ this.writerParams = writerParams;
+ this.executorParameters = executorParameters;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List getConnectorParameters() {
+ return connectorParameters;
+ }
+
+ public void setConnectorParameters(List connectorParameters) {
+ this.connectorParameters = connectorParameters;
+ }
+
+ public List getWriterParams() {
+ return writerParams;
+ }
+
+ public void setWriterParams(List writerParams) {
+ this.writerParams = writerParams;
+ }
+
+ public List getExecutorParameters() {
+ return executorParameters;
+ }
+
+ public void setExecutorParameters(List executorParameters) {
+ this.executorParameters = executorParameters;
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(name), "name should not be empty");
+ Preconditions.checkArgument(connectorParameters != null, "connector param should not be empty");
+ for (ConnectorParameter connectorParameter:connectorParameters) {
+ connectorParameter.validate();
+ }
+ Preconditions.checkArgument(writerParams != null, "writer param should not be empty");
+ for (WriterParameter writerParameter:writerParams) {
+ writerParameter.validate();
+ }
+ Preconditions.checkArgument(executorParameters != null, "executor param should not be empty");
+ for (ExecutorParameter executorParameter:executorParameters) {
+ executorParameter.validate();
+ }
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/ExecutorParameter.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/ExecutorParameter.java
new file mode 100644
index 0000000000000000000000000000000000000000..553efe779ed1194ae3a51e1ff23f3fd250563976
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/ExecutorParameter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.configuration;
+
+import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * ExecutorParameter
+ */
+public class ExecutorParameter implements IParameter {
+
+ @JsonProperty("index")
+ private String index;
+
+ @JsonProperty("execute.sql")
+ private String executeSql;
+
+ @JsonProperty("table.alias")
+ private String tableAlias;
+
+ public ExecutorParameter() {
+ }
+
+ public ExecutorParameter(String index, String executeSql, String tableAlias) {
+ this.index = index;
+ this.executeSql = executeSql;
+ this.tableAlias = tableAlias;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public String getExecuteSql() {
+ return executeSql;
+ }
+
+ public void setExecuteSql(String executeSql) {
+ this.executeSql = executeSql;
+ }
+
+ public String getTableAlias() {
+ return tableAlias;
+ }
+
+ public void setTableAlias(String tableAlias) {
+ this.tableAlias = tableAlias;
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkArgument(index != null, "index should not be empty");
+ Preconditions.checkArgument(executeSql != null, "executeSql should not be empty");
+ Preconditions.checkArgument(tableAlias != null, "tableAlias should not be empty");
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/IParameter.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/IParameter.java
new file mode 100644
index 0000000000000000000000000000000000000000..1cd6d6e9eeedac4d26c7ff30a954b163056fce60
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/IParameter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.configuration;
+
+/**
+ * IParameter
+ */
+public interface IParameter {
+
+ /**
+ * check the parameter
+ */
+ void validate();
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/WriterParameter.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/WriterParameter.java
new file mode 100644
index 0000000000000000000000000000000000000000..7b841a5c660e700ca1d9adb8944b71af1f091a72
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/configuration/WriterParameter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.configuration;
+
+import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
+import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * WriterParameter
+ */
+public class WriterParameter implements IParameter {
+
+ @JsonProperty("type")
+ private String type;
+
+ @JsonProperty("config")
+ private Map config;
+
+ public WriterParameter() {
+ }
+
+ public WriterParameter(String type, Map config) {
+ this.type = type;
+ this.config = config;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Map getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map config) {
+ this.config = config;
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(type), "type should not be empty");
+ Preconditions.checkArgument(config != null, "config should not be empty");
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/context/DataQualityContext.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/context/DataQualityContext.java
new file mode 100644
index 0000000000000000000000000000000000000000..c6f2f8b78861abdcf0f1237d46847befe629b14c
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/context/DataQualityContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.context;
+
+import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
+import org.apache.dolphinscheduler.data.quality.configuration.ExecutorParameter;
+import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.util.List;
+
+/**
+ * DataQualityContext
+ */
+public class DataQualityContext {
+
+ private SparkSession sparkSession;
+
+ private List connectorParameterList;
+
+ private List executorParameterList;
+
+ private List writerParamList;
+
+ public DataQualityContext() {
+ }
+
+ public DataQualityContext(SparkSession sparkSession,
+ List connectorParameterList,
+ List executorParameterList,
+ List writerParamList) {
+ this.sparkSession = sparkSession;
+ this.connectorParameterList = connectorParameterList;
+ this.executorParameterList = executorParameterList;
+ this.writerParamList = writerParamList;
+ }
+
+ public SparkSession getSparkSession() {
+ return sparkSession;
+ }
+
+ public void setSparkSession(SparkSession sparkSession) {
+ this.sparkSession = sparkSession;
+ }
+
+ public List getConnectorParameterList() {
+ return connectorParameterList;
+ }
+
+ public void setConnectorParameterList(List connectorParameterList) {
+ this.connectorParameterList = connectorParameterList;
+ }
+
+ public List getExecutorParameterList() {
+ return executorParameterList;
+ }
+
+ public void setExecutorParameterList(List executorParameterList) {
+ this.executorParameterList = executorParameterList;
+ }
+
+ public List getWriterParamList() {
+ return writerParamList;
+ }
+
+ public void setWriterParamList(List writerParamList) {
+ this.writerParamList = writerParamList;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/enums/ConnectorType.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/enums/ConnectorType.java
new file mode 100644
index 0000000000000000000000000000000000000000..b388559c5b84bef9d5b49a599cc7a24fec04e1db
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/enums/ConnectorType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.enums;
+
+/**
+ * ConnectorType
+ */
+public enum ConnectorType {
+ /**
+ * JDBC
+ * HIVE
+ */
+ JDBC,
+ HIVE;
+
+ public static ConnectorType getType(String name) {
+ for (ConnectorType type: ConnectorType.values()) {
+ if (type.name().equalsIgnoreCase(name)) {
+ return type;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/enums/WriterType.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/enums/WriterType.java
new file mode 100644
index 0000000000000000000000000000000000000000..dbb073d467d201a655bc5519291163ccc243065d
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/enums/WriterType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.enums;
+
+/**
+ * WriterType
+ */
+public enum WriterType {
+ /**
+ * JDBC
+ */
+ JDBC;
+
+ public static WriterType getType(String name) {
+ for (WriterType type: WriterType.values()) {
+ if (type.name().equalsIgnoreCase(name)) {
+ return type;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/exception/DataQualityException.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/exception/DataQualityException.java
new file mode 100644
index 0000000000000000000000000000000000000000..34df8ad6cf51477b58c8eacce34e3308939bf3d6
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/exception/DataQualityException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.exception;
+
+/**
+ * data quality exception
+ */
+public class DataQualityException extends Exception {
+
+ public DataQualityException() {
+ super();
+ }
+
+ /**
+ * Construct a new runtime exception with the detail message
+ *
+ * @param message detail message
+ */
+ public DataQualityException(String message) {
+ super(message);
+ }
+
+ /**
+ * Construct a new runtime exception with the detail message and cause
+ *
+ * @param message the detail message
+ * @param cause the cause
+ * @since 1.4
+ */
+ public DataQualityException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Construct a new runtime exception with throwable
+ *
+ * @param cause the cause
+ */
+ public DataQualityException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/DataQualityTask.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/DataQualityTask.java
new file mode 100644
index 0000000000000000000000000000000000000000..e24826727a4d750b9497823e52a3666ff32f500f
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/DataQualityTask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow;
+
+/**
+ * DataQualityTask
+ */
+public interface DataQualityTask {
+
+ /**
+ * execute
+ */
+ void execute();
+
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/JdbcBaseConfig.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/JdbcBaseConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..855cc0b01e266ea0b3eaf8fe117d6c70d50439de
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/JdbcBaseConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DRIVER;
+import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
+import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY;
+import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
+import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
+import static org.apache.dolphinscheduler.data.quality.Constants.URL;
+import static org.apache.dolphinscheduler.data.quality.Constants.USER;
+
+import org.apache.dolphinscheduler.data.quality.Constants;
+
+import java.util.Map;
+
+/**
+ * JdbcBaseConfig
+ */
+public class JdbcBaseConfig {
+
+ private String database;
+
+ private String table;
+
+ private String dbTable;
+
+ private String url;
+
+ private String user;
+
+ private String password;
+
+ private String driver;
+
+ public JdbcBaseConfig(Map config) {
+ database = String.valueOf(config.getOrDefault(DATABASE,DEFAULT_DATABASE));
+ table = String.valueOf(config.getOrDefault(TABLE,EMPTY));
+ dbTable = database + Constants.DOTS + table;
+ url = String.valueOf(config.getOrDefault(URL,EMPTY));
+ user = String.valueOf(config.getOrDefault(USER,EMPTY));
+ password = String.valueOf(config.getOrDefault(PASSWORD,EMPTY));
+ driver = String.valueOf(config.getOrDefault(DRIVER,DEFAULT_DRIVER));
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDbTable() {
+ return dbTable;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/ConnectorFactory.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/ConnectorFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..1fc9c5faf032207941f20504c42649aa17a8a21c
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/ConnectorFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.connector;
+
+import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
+import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
+import org.apache.dolphinscheduler.data.quality.enums.ConnectorType;
+import org.apache.dolphinscheduler.data.quality.exception.DataQualityException;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ConnectorFactory
+ */
+public class ConnectorFactory {
+
+ private static class Singleton {
+ static ConnectorFactory instance = new ConnectorFactory();
+ }
+
+ public static ConnectorFactory getInstance() {
+ return Singleton.instance;
+ }
+
+ public List getConnectors(DataQualityContext context) throws DataQualityException {
+
+ List connectorList = new ArrayList<>();
+
+ for (ConnectorParameter connectorParameter :context.getConnectorParameterList()) {
+ IConnector connector = getConnector(context.getSparkSession(), connectorParameter);
+ if (connector != null) {
+ connectorList.add(connector);
+ }
+ }
+
+ return connectorList;
+ }
+
+ private IConnector getConnector(SparkSession sparkSession,ConnectorParameter connectorParameter) throws DataQualityException {
+ ConnectorType connectorType = ConnectorType.getType(connectorParameter.getType());
+ if (connectorType != null) {
+ switch (connectorType) {
+ case HIVE:
+ return new HiveConnector(sparkSession, connectorParameter);
+ case JDBC:
+ return new JdbcConnector(sparkSession, connectorParameter);
+ default:
+ throw new DataQualityException("connector type ${connectorType} is not supported!");
+ }
+ }
+
+ return null;
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/HiveConnector.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/HiveConnector.java
new file mode 100644
index 0000000000000000000000000000000000000000..64d1d27f334864338873cf16e6bc7aefaa3fdc81
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/HiveConnector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.connector;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY;
+import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
+
+import org.apache.dolphinscheduler.data.quality.Constants;
+import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Map;
+
+/**
+ * HiveConnector
+ */
+public class HiveConnector implements IConnector {
+
+ private final SparkSession sparkSession;
+
+ private final ConnectorParameter connectorParameter;
+
+ public HiveConnector(SparkSession sparkSession, ConnectorParameter connectorParameter) {
+ this.sparkSession = sparkSession;
+ this.connectorParameter = connectorParameter;
+ }
+
+ @Override
+ public void execute() {
+ Map config = connectorParameter.getConfig();
+ String database = String.valueOf(config.getOrDefault(DATABASE,DEFAULT_DATABASE));
+ String table = String.valueOf(config.getOrDefault(TABLE,EMPTY));
+ String dbTable = database + Constants.DOTS + table;
+
+ sparkSession.table(dbTable).createOrReplaceTempView(table);
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/IConnector.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/IConnector.java
new file mode 100644
index 0000000000000000000000000000000000000000..8a56cb453bc5eb4214fb8abac5e15bf863e2f4df
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/IConnector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.connector;
+
+import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
+
+/**
+ * IConnector
+ */
+public interface IConnector extends DataQualityTask {
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/JdbcConnector.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/JdbcConnector.java
new file mode 100644
index 0000000000000000000000000000000000000000..1be94ae50eb94e4cab57fe05ae55e08f037f1585
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/connector/JdbcConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.connector;
+
+import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
+import org.apache.dolphinscheduler.data.quality.flow.JdbcBaseConfig;
+import org.apache.dolphinscheduler.data.quality.utils.JdbcUtils;
+import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Map;
+
+/**
+ * JdbcConnector
+ */
+public class JdbcConnector implements IConnector {
+
+ private final SparkSession sparkSession;
+
+ private final ConnectorParameter connectorParameter;
+
+ public JdbcConnector(SparkSession sparkSession, ConnectorParameter connectorParameter) {
+ this.sparkSession = sparkSession;
+ this.connectorParameter = connectorParameter;
+ }
+
+ @Override
+ public void execute() {
+
+ Map config = connectorParameter.getConfig();
+ JdbcBaseConfig jdbcBaseConfig = new JdbcBaseConfig(config);
+
+ Preconditions.checkArgument(JdbcUtils.isJdbcDriverLoaded(jdbcBaseConfig.getDriver()), "JDBC driver $driver not present in classpath");
+
+ sparkSession
+ .read()
+ .format("jdbc")
+ .option("driver",jdbcBaseConfig.getDriver())
+ .option("url",jdbcBaseConfig.getUrl())
+ .option("dbtable", jdbcBaseConfig.getDbTable())
+ .option("user", jdbcBaseConfig.getUser())
+ .option("password", jdbcBaseConfig.getPassword())
+ .load().createOrReplaceTempView(jdbcBaseConfig.getTable());
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/executor/SparkSqlExecuteTask.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/executor/SparkSqlExecuteTask.java
new file mode 100644
index 0000000000000000000000000000000000000000..3c8c60f60889a27e268db5487fcc408088b02dfc
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/executor/SparkSqlExecuteTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.executor;
+
+import org.apache.dolphinscheduler.data.quality.configuration.ExecutorParameter;
+import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
+import org.apache.dolphinscheduler.data.quality.utils.StringUtils;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SparkSqlExecuteTask
+ */
+public class SparkSqlExecuteTask implements DataQualityTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(SparkSqlExecuteTask.class);
+
+ private final SparkSession sparkSession;
+
+ private final List executorParameterList;
+
+ public SparkSqlExecuteTask(SparkSession sparkSession,List executorParameterList) {
+ this.sparkSession = sparkSession;
+ this.executorParameterList = executorParameterList;
+ }
+
+ @Override
+ public void execute() {
+ for (ExecutorParameter executorParameter : executorParameterList) {
+ if (StringUtils.isNotEmpty(executorParameter.getTableAlias())) {
+ sparkSession
+ .sql(executorParameter.getExecuteSql())
+ .createOrReplaceTempView(executorParameter.getTableAlias());
+ } else {
+ logger.error("lost table alias");
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/IWriter.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/IWriter.java
new file mode 100644
index 0000000000000000000000000000000000000000..e62e893da4d0a4f7a8b860d798b15642d9dcb8b6
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/IWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.writer;
+
+import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask;
+
+/**
+ * IWriter
+ */
+public interface IWriter extends DataQualityTask {
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/JdbcWriter.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/JdbcWriter.java
new file mode 100644
index 0000000000000000000000000000000000000000..2ae4e834926825f63273c2d525579d3cb7d106d7
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/JdbcWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.writer;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY;
+import static org.apache.dolphinscheduler.data.quality.Constants.SQL;
+
+import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
+import org.apache.dolphinscheduler.data.quality.flow.JdbcBaseConfig;
+import org.apache.dolphinscheduler.data.quality.utils.JdbcUtils;
+import org.apache.dolphinscheduler.data.quality.utils.Preconditions;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Map;
+
+/**
+ * JdbcWriter
+ */
+public class JdbcWriter implements IWriter {
+
+ private final SparkSession sparkSession;
+
+ private final WriterParameter writerParam;
+
+ public JdbcWriter(SparkSession sparkSession, WriterParameter writerParam) {
+ this.sparkSession = sparkSession;
+ this.writerParam = writerParam;
+ }
+
+ @Override
+ public void execute() {
+
+ Map config = writerParam.getConfig();
+ JdbcBaseConfig jdbcBaseConfig = new JdbcBaseConfig(config);
+ String sql = String.valueOf(config.getOrDefault(SQL,EMPTY));
+
+ Preconditions.checkArgument(JdbcUtils.isJdbcDriverLoaded(jdbcBaseConfig.getDriver()), "JDBC driver $driver not present in classpath");
+
+ sparkSession.sql(sql)
+ .write()
+ .format("jdbc")
+ .option("driver",jdbcBaseConfig.getDriver())
+ .option("url",jdbcBaseConfig.getUrl())
+ .option("dbtable", jdbcBaseConfig.getTable())
+ .option("user", jdbcBaseConfig.getUser())
+ .option("password", jdbcBaseConfig.getPassword())
+ .mode(SaveMode.Append)
+ .save();
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/WriterFactory.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/WriterFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..f067a776d831c8c751485eb2ddcd98b265b6a4ff
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/flow/writer/WriterFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.writer;
+
+import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
+import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
+import org.apache.dolphinscheduler.data.quality.enums.WriterType;
+import org.apache.dolphinscheduler.data.quality.exception.DataQualityException;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WriterFactory
+ */
+public class WriterFactory {
+
+ private static class Singleton {
+ static WriterFactory instance = new WriterFactory();
+ }
+
+ public static WriterFactory getInstance() {
+ return Singleton.instance;
+ }
+
+ public List getWriters(DataQualityContext context) throws DataQualityException {
+
+ List writerList = new ArrayList<>();
+
+ for (WriterParameter writerParam:context.getWriterParamList()) {
+ IWriter writer = getWriter(context.getSparkSession(),writerParam);
+ if (writer != null) {
+ writerList.add(writer);
+ }
+ }
+
+ return writerList;
+ }
+
+ private IWriter getWriter(SparkSession sparkSession,WriterParameter writerParam) throws DataQualityException {
+ WriterType writerType = WriterType.getType(writerParam.getType());
+ if (writerType != null) {
+ if (writerType == WriterType.JDBC) {
+ return new JdbcWriter(sparkSession, writerParam);
+ }
+ throw new DataQualityException("writer type $readerType is not supported!");
+ }
+
+ return null;
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/JdbcUtils.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/JdbcUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..e935ca97dfff72e7ba2500e0d8a1978816e374ec
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/JdbcUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JdbcUtil
+ */
+public class JdbcUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(JdbcUtils.class);
+
+ private JdbcUtils() {
+ throw new UnsupportedOperationException("Construct JdbcUtils");
+ }
+
+ public static boolean isJdbcDriverLoaded(String driver) {
+ try {
+ Class.forName(driver);
+ return true;
+ } catch (ClassNotFoundException e) {
+ logger.error("JDBC driver $driver provided is not found in class path", e);
+ return false;
+ }
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/JsonUtils.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/JsonUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..b6ac9ba545b7fae62414f68d5d7b9e8cfca6811f
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/JsonUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.utils;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import java.util.TimeZone;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * JsonUtil
+ */
+public class JsonUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class);
+
+ /**
+ * can use static singleton, inject: just make sure to reuse!
+ */
+ private static final ObjectMapper MAPPER = new ObjectMapper()
+ .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .setTimeZone(TimeZone.getDefault());
+
+ private JsonUtils() {
+ throw new UnsupportedOperationException("Construct JSONUtils");
+ }
+
+ public static String toJson(Object object) {
+ try {
+ return MAPPER.writeValueAsString(object);
+ } catch (Exception e) {
+ logger.error("object to json exception!", e);
+ }
+ return null;
+ }
+
+ public static T fromJson(String json, Class clazz) {
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ try {
+ return MAPPER.readValue(json, clazz);
+ } catch (Exception e) {
+ logger.error("parse object exception!", e);
+ }
+ return null;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/Preconditions.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/Preconditions.java
new file mode 100644
index 0000000000000000000000000000000000000000..4ae12f74186a3a1eb027f2449a69ba88ef937cfa
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/Preconditions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.utils;
+
+/**
+ * utility methods for validating input
+ */
+public final class Preconditions {
+
+ private Preconditions() {
+ throw new UnsupportedOperationException("Construct Preconditions");
+ }
+
+ /**
+ * if condition is false will throw an IllegalArgumentException with the given message
+ *
+ * @param condition condition
+ * @param errorMsg error message
+ * @throws IllegalArgumentException Thrown, if the condition is violated.
+ */
+ public static void checkArgument(boolean condition, Object errorMsg) {
+ if (!condition) {
+ throw new IllegalArgumentException(String.valueOf(errorMsg));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/StringUtils.java b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/StringUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..6900c36e34cdeb5feebb65a469a810b0daf7bf2d
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/java/org/apache/dolphinscheduler/data/quality/utils/StringUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.utils;
+
+/**
+ * StringUtils
+ */
+public class StringUtils {
+
+ public static final String EMPTY = "";
+
+ private StringUtils() {
+ throw new IllegalStateException("Construct StringUtils");
+ }
+
+ public static boolean isEmpty(final CharSequence cs) {
+ return cs == null || cs.length() == 0;
+ }
+
+ public static boolean isNotEmpty(final CharSequence cs) {
+ return !isEmpty(cs);
+ }
+
+ public static boolean isBlank(String s) {
+ if (isEmpty(s)) {
+ return true;
+ }
+ return s.trim().length() == 0;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/main/resources/log4j.properties b/dolphinscheduler-data-quality/src/main/resources/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..a05b60ebe9ca700383e762c350bb5d41c8dfecc8
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/main/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java
new file mode 100644
index 0000000000000000000000000000000000000000..597632897ae5ea8008920872f1be05a1ea52d643
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/SparkApplicationTestBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+import org.junit.Before;
+
+/**
+ * SparkApplicationTestBase
+ */
+public class SparkApplicationTestBase {
+
+ protected SparkSession sparkSession;
+
+ @Before
+ public void before() {
+ SparkConf conf = new SparkConf().setAppName("data quality test");
+ conf.set("spark.sql.crossJoin.enabled", "true");
+ sparkSession = SparkSession.builder()
+ .master("local[4]")
+ .config(conf)
+ .getOrCreate();
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/configuration/ConfigurationParserTest.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/configuration/ConfigurationParserTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..97935f05ebc7cf87f0d026d4bf3e4a9152ded4a8
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/configuration/ConfigurationParserTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.configuration;
+
+import org.apache.dolphinscheduler.data.quality.utils.JsonUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * ConfigurationParserTest
+ */
+public class ConfigurationParserTest {
+
+ @Test
+ public void testConfigurationValidate() {
+ Assert.assertEquals(1,verifyConfigurationValidate());
+ }
+
+ private int verifyConfigurationValidate() {
+ int flag = 1;
+ try {
+ String parameterStr = "{\n"
+ + "\t\"name\": \"\\u81EA\\u5B9A\\u4E49SQL\",\n"
+ + "\t\"connectors\": [{\n"
+ + "\t\t\"type\": \"JDBC\",\n"
+ + "\t\t\"config\": {\n"
+ + "\t\t\t\"database\": \"test\",\n"
+ + "\t\t\t\"password\": \"123456\",\n"
+ + "\t\t\t\"driver\": \"com.mysql.jdbc.Driver\",\n"
+ + "\t\t\t\"user\": \"test\",\n"
+ + "\t\t\t\"table\": \"test1\",\n"
+ + "\t\t\t\"url\": \"jdbc:mysql://localhost:3306/test\"\n"
+ + "\t\t}\n"
+ + "\t}],\n"
+ + "\t\"writers\": [{\n"
+ + "\t\t\"type\": \"JDBC\",\n"
+ + "\t\t\"config\": {\n"
+ + "\t\t\t\"database\": \"dolphinscheduler\",\n"
+ + "\t\t\t\"password\": \"Test@123!\",\n"
+ + "\t\t\t\"driver\": \"com.mysql.jdbc.Driver\",\n"
+ + "\t\t\t\"user\": \"test\",\n"
+ + "\t\t\t\"table\": \"t_ds_dqs_result\",\n"
+ + "\t\t\t\"url\": \"jdbc:mysql://localhost:3306/dolphinscheduler?characterEncoding=UTF-8&allowMultiQueries=true\",\n"
+ + "\t\t\t\"sql\": \"SELECT 1 as rule_type,"
+ + "'\\u81EA\\u5B9A\\u4E49SQL' as rule_name,"
+ + "18 as process_definition_id,"
+ + "64 as process_instance_id,"
+ + "70 as task_instance_id,"
+ + "mySum AS statistics_value, "
+ + "total_count.total AS comparison_value,"
+ + "0 as check_type,"
+ + "6 as threshold, "
+ + "0 as operator, "
+ + "0 as failure_strategy, "
+ + "'2021-01-31 15:00:07' as create_time,"
+ + "'2021-01-31 15:00:07' as update_time from ( select sum(c4) as mySum from test1 ) tmp1 join total_count\"\n"
+ + "\t\t}\n"
+ + "\t}],\n"
+ + "\t\"executors\": [{\n"
+ + "\t\t\"index\": \"1\",\n"
+ + "\t\t\"execute.sql\": \"SELECT COUNT(*) AS total FROM test1 WHERE (c3 != '55')\",\n"
+ + "\t\t\"table.alias\": \"total_count\"\n"
+ + "\t}]\n"
+ + "}";
+ DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(parameterStr,DataQualityConfiguration.class);
+ dataQualityConfiguration.validate();
+ } catch (Exception e) {
+ flag = 0;
+ e.printStackTrace();
+ }
+ return flag;
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/FlowTestBase.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/FlowTestBase.java
new file mode 100644
index 0000000000000000000000000000000000000000..823e9d55cd083c68f938232b26860cea32153b30
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/FlowTestBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow;
+
+import org.apache.dolphinscheduler.data.quality.SparkApplicationTestBase;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+/**
+ * FlowTestBase
+ */
+public class FlowTestBase extends SparkApplicationTestBase {
+
+ protected String url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1";
+
+ protected String driver = "org.h2.Driver";
+
+ protected Connection getConnection() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty("user", "test");
+ properties.setProperty("password", "123456");
+ properties.setProperty("rowId", "false");
+ DriverManager.registerDriver(new org.h2.Driver());
+ Class.forName(driver, false, this.getClass().getClassLoader());
+ return DriverManager.getConnection(url, properties);
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/connector/ConnectorFactoryTest.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/connector/ConnectorFactoryTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..f3d94ecbdb859e70e952bd818c93941caa531aa7
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/connector/ConnectorFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.connector;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
+import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
+import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
+import static org.apache.dolphinscheduler.data.quality.Constants.URL;
+import static org.apache.dolphinscheduler.data.quality.Constants.USER;
+
+import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
+import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * ConnectorFactoryTest
+ */
+public class ConnectorFactoryTest {
+
+ @Test
+ public void testConnectorGenerate() {
+
+ DataQualityContext context = new DataQualityContext();
+ List connectorParameters = new ArrayList<>();
+ ConnectorParameter connectorParameter = new ConnectorParameter();
+ connectorParameter.setType("JDBC");
+ Map config = new HashMap<>();
+ config.put(DATABASE,"test");
+ config.put(TABLE,"test1");
+ config.put(URL,"jdbc:mysql://localhost:3306/test");
+ config.put(USER,"test");
+ config.put(PASSWORD,"123456");
+ config.put(DRIVER,"com.mysql.jdbc.Driver");
+ connectorParameter.setConfig(config);
+ connectorParameter.setConfig(null);
+ connectorParameters.add(connectorParameter);
+ context.setConnectorParameterList(connectorParameters);
+
+ int flag = 0;
+ try {
+ List connectors = ConnectorFactory.getInstance().getConnectors(context);
+ if (connectors != null && connectors.size() >= 1) {
+ flag = 1;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ Assert.assertEquals(1,flag);
+ }
+}
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/connector/JdbcConnectorTest.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/connector/JdbcConnectorTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..9cd6cfc589132d36b9d4fcec7852180e7fd9eb6a
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/connector/JdbcConnectorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.connector;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
+import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
+import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
+import static org.apache.dolphinscheduler.data.quality.Constants.URL;
+import static org.apache.dolphinscheduler.data.quality.Constants.USER;
+
+import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter;
+import org.apache.dolphinscheduler.data.quality.flow.FlowTestBase;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * JdbcConnectorTest
+ */
+public class JdbcConnectorTest extends FlowTestBase {
+
+ @Before
+ public void before() {
+ super.before();
+ createConnectorTable();
+ }
+
+ @Test
+ public void testJdbcConnectorExecute() {
+ JdbcConnector jdbcConnector = new JdbcConnector(sparkSession,buildConnectorParameter());
+ jdbcConnector.execute();
+ }
+
+ private ConnectorParameter buildConnectorParameter() {
+ ConnectorParameter connectorParameter = new ConnectorParameter();
+ connectorParameter.setType("JDBC");
+ Map config = new HashMap<>();
+ config.put(DATABASE,"test");
+ config.put(TABLE,"test1");
+ config.put(URL,url);
+ config.put(USER,"test");
+ config.put(PASSWORD,"123456");
+ config.put(DRIVER,driver);
+ connectorParameter.setConfig(config);
+ return connectorParameter;
+ }
+
+ private void createConnectorTable() {
+ try {
+ Connection connection = getConnection();
+ connection.prepareStatement("create schema if not exists test").executeUpdate();
+
+ connection.prepareStatement("drop table if exists test.test1").executeUpdate();
+ connection
+ .prepareStatement(
+ "CREATE TABLE test.test1 (\n"
+ + " `id` int(11) NOT NULL AUTO_INCREMENT,\n"
+ + " `company` varchar(255) DEFAULT NULL,\n"
+ + " `date` varchar(255) DEFAULT NULL,\n"
+ + " `c1` varchar(255) DEFAULT NULL,\n"
+ + " `c2` varchar(255) DEFAULT NULL,\n"
+ + " `c3` varchar(255) DEFAULT NULL,\n"
+ + " `c4` int(11) DEFAULT NULL,\n"
+ + " PRIMARY KEY (`id`)\n"
+ + ")")
+ .executeUpdate();
+ connection.prepareStatement("INSERT INTO test.test1 (company,`date`,c1,c2,c3,c4) VALUES\n"
+ + "\t ('1','2019-03-01','11','12','13',1),\n"
+ + "\t ('2','2019-06-01','21','22','23',1),\n"
+ + "\t ('3','2019-09-01','31','32','33',1),\n"
+ + "\t ('4','2019-12-01','41','42','43',1),\n"
+ + "\t ('5','2013','42','43','54',1),\n"
+ + "\t ('6','2020','42','43','54',1);").executeUpdate();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/writer/JdbcWriterTest.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/writer/JdbcWriterTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..65357374c4138654bff3706c104ae0ed45379982
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/writer/JdbcWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.writer;
+
+import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE;
+import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER;
+import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD;
+import static org.apache.dolphinscheduler.data.quality.Constants.TABLE;
+import static org.apache.dolphinscheduler.data.quality.Constants.URL;
+import static org.apache.dolphinscheduler.data.quality.Constants.USER;
+
+import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
+import org.apache.dolphinscheduler.data.quality.flow.FlowTestBase;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * JdbcWriterTest
+ */
+public class JdbcWriterTest extends FlowTestBase {
+
+ @Before
+ public void before() {
+ super.before();
+ createWriterTable();
+ }
+
+ @Test
+ public void testJdbcWriterExecute() {
+ JdbcWriter jdbcWriter = new JdbcWriter(sparkSession,buildWriterParameter());
+ jdbcWriter.execute();
+ }
+
+ private WriterParameter buildWriterParameter() {
+ WriterParameter writerParameter = new WriterParameter();
+ writerParameter.setType("JDBC");
+ Map config = new HashMap<>();
+ config.put(DATABASE,"test");
+ config.put(TABLE,"test2");
+ config.put(URL,url);
+ config.put(USER,"test");
+ config.put(PASSWORD,"123456");
+ config.put(DRIVER,driver);
+ config.put("sql","SELECT '1' as company,'1' as date,'2' as c1,'2' as c2,'2' as c3, 2 as c4");
+ writerParameter.setConfig(config);
+ return writerParameter;
+ }
+
+ private void createWriterTable() {
+ try {
+ Connection connection = getConnection();
+ connection.prepareStatement("create schema if not exists test").executeUpdate();
+
+ connection.prepareStatement("drop table if exists test.test2").executeUpdate();
+ connection
+ .prepareStatement(
+ "CREATE TABLE test.test2 (\n"
+ + " `id` int(11) NOT NULL AUTO_INCREMENT,\n"
+ + " `company` varchar(255) DEFAULT NULL,\n"
+ + " `date` varchar(255) DEFAULT NULL,\n"
+ + " `c1` varchar(255) DEFAULT NULL,\n"
+ + " `c2` varchar(255) DEFAULT NULL,\n"
+ + " `c3` varchar(255) DEFAULT NULL,\n"
+ + " `c4` int(11) DEFAULT NULL,\n"
+ + " PRIMARY KEY (`id`)\n"
+ + ")")
+ .executeUpdate();
+ connection.prepareStatement("set schema test").executeUpdate();
+ connection.commit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/writer/WriterFactoryTest.java b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/writer/WriterFactoryTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..eea66e1b5f15aa14bc82d7294dca2d8a623d0261
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/java/org/apache/dolphinscheduler/data/quality/flow/writer/WriterFactoryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.data.quality.flow.writer;
+
+import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter;
+import org.apache.dolphinscheduler.data.quality.context.DataQualityContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * WriterFactoryTest
+ */
+public class WriterFactoryTest {
+
+ @Test
+ public void testWriterGenerate() {
+
+ DataQualityContext context = new DataQualityContext();
+ List writerParameters = new ArrayList<>();
+ WriterParameter writerParameter = new WriterParameter();
+ writerParameter.setType("JDBC");
+ writerParameter.setConfig(null);
+ writerParameters.add(writerParameter);
+ context.setWriterParamList(writerParameters);
+
+ int flag = 0;
+ try {
+ List writers = WriterFactory.getInstance().getWriters(context);
+ if (writers != null && writers.size() >= 1) {
+ flag = 1;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ Assert.assertEquals(1,flag);
+ }
+}
diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml
index 1ec74fd409f45edb36916bd962edd33cb22cd425..cb8ad52b0d4a489f8a8ce3f48e2556e852f4f4e0 100644
--- a/dolphinscheduler-dist/pom.xml
+++ b/dolphinscheduler-dist/pom.xml
@@ -41,6 +41,11 @@
org.apache.dolphinscheduler
dolphinscheduler-api
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-data-quality
+
diff --git a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
index 34f2cd2a9a96400ee40daf07bd8bf44b35a82c77..b050bbb3d805b0b6aff54c536e39387cd147c616 100644
--- a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
+++ b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
@@ -151,6 +151,14 @@
.
+
+ ${basedir}/../dolphinscheduler-data-quality/target/dolphinscheduler-data-quality-${project.version}
+
+ **/*.*
+
+ .
+
+
${basedir}/../dolphinscheduler-ui/dist
diff --git a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml
index b47a0ae3935c985a90ececaa1a87d4bfdd2db352..b4d34e04fc3486a447da21be0fddfda1e548bc36 100644
--- a/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml
+++ b/dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml
@@ -151,6 +151,14 @@
.
+
+ ${basedir}/../dolphinscheduler-data-quality/target/dolphinscheduler-data-quality-${project.version}
+
+ **/*.*
+
+ .
+
+
${basedir}/../dolphinscheduler-ui/dist
diff --git a/pom.xml b/pom.xml
index 6d7446f8d13b9049feb737e9596abdf3b76a730d..dfb43748c1f20b0567ba11490bc96f79d95a80e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,8 @@
2.0.0
0.184
${dep.airlift.version}
+ 2.11
+ 2.4.0
@@ -241,6 +243,11 @@
dolphinscheduler-spi
${project.version}
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-data-quality
+ ${project.version}
+
org.apache.curator
@@ -355,11 +362,13 @@
${mysql.connector.version}
test
+
com.h2database
h2
${h2.version}
+
org.slf4j
slf4j-api
@@ -592,6 +601,34 @@
1.6.2
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+
+
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+
+
+ commons-httpclient
+ commons-httpclient
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
+
@@ -987,6 +1024,11 @@
**/alert/processor/AlertRequestProcessorTest.java
**/alert/runner/AlertSenderTest.java
**/alert/AlertServerTest.java
+ **/data/quality/configuration/ConfigurationParserTest.java
+ **/data/quality/flow/connector/ConnectorFactoryTest.java
+ **/data/quality/flow/connector/JdbcConnectorTest.java
+ **/data/quality/flow/writer/WriterFactoryTest.java
+ **/data/quality/flow/writer/JdbcWriterTest.java
@@ -1105,5 +1147,6 @@
dolphinscheduler-service
dolphinscheduler-spi
dolphinscheduler-microbench
+ dolphinscheduler-data-quality