未验证 提交 d531e285 编写于 作者: A avalon5666 提交者: GitHub

Fix insert failed (#8164)

上级 37bee90a
......@@ -39,6 +39,8 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
......@@ -61,15 +63,16 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
protected AbstractJDBCImporter(final ImporterConfiguration importerConfig, final DataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
sqlBuilder = createSQLBuilder();
sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
}
/**
* Create SQL builder.
*
* @param shardingColumnsMap sharding columns map
* @return SQL builder
*/
protected abstract AbstractSQLBuilder createSQLBuilder();
protected abstract AbstractSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
@Override
public final void start() {
......
......@@ -18,17 +18,23 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Collections2;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Abstract SQL builder.
*/
@RequiredArgsConstructor
public abstract class AbstractSQLBuilder {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
......@@ -39,6 +45,9 @@ public abstract class AbstractSQLBuilder {
private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
@Getter(AccessLevel.PROTECTED)
private final Map<String, Set<String>> shardingColumnsMap;
/**
* Get left identifier quote string.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.utils;
import java.util.Map;
import java.util.Set;
/**
* Sharding columns util.
*/
public final class ShardingColumnsUtil {
/**
* Is Sharding column.
*
* @param shardingColumnsMap sharding columns map
* @param tableName table name
* @param columnName column name
* @return boolean
*/
public static boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap,
final String tableName, final String columnName) {
return shardingColumnsMap.containsKey(tableName)
&& shardingColumnsMap.get(tableName).contains(columnName);
}
}
......@@ -90,7 +90,7 @@ public final class AbstractJDBCImporterTest {
jdbcImporter = new AbstractJDBCImporter(getImporterConfiguration(), dataSourceManager) {
@Override
protected AbstractSQLBuilder createSQLBuilder() {
protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
return sqlBuilder;
}
};
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
......@@ -36,7 +37,7 @@ public class AbstractSqlBuilderTest {
@Before
public void setUp() {
sqlBuilder = new AbstractSQLBuilder() {
sqlBuilder = new AbstractSQLBuilder(Maps.newHashMap()) {
@Override
protected String getLeftIdentifierQuoteString() {
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.fixture;
import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
......@@ -44,7 +45,7 @@ public final class FixtureDataConsistencyChecker extends AbstractDataConsistency
@Override
protected AbstractSQLBuilder getSqlBuilder() {
return new AbstractSQLBuilder() {
return new AbstractSQLBuilder(Maps.newHashMap()) {
@Override
protected String getLeftIdentifierQuoteString() {
return "`";
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.mysql;
import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
......@@ -97,6 +98,6 @@ public final class MySQLDataConsistencyChecker extends AbstractDataConsistencyCh
@Override
protected MySQLSQLBuilder getSqlBuilder() {
return new MySQLSQLBuilder();
return new MySQLSQLBuilder(Maps.newHashMap());
}
}
......@@ -22,6 +22,9 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import java.util.Map;
import java.util.Set;
/**
* MySQL importer.
*/
......@@ -32,7 +35,7 @@ public final class MySQLImporter extends AbstractJDBCImporter {
}
@Override
protected AbstractSQLBuilder createSQLBuilder() {
return new MySQLSQLBuilder();
protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
return new MySQLSQLBuilder(shardingColumnsMap);
}
}
......@@ -20,12 +20,20 @@ package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.utils.ShardingColumnsUtil;
import java.util.Map;
import java.util.Set;
/**
* MySQL SQL builder.
*/
public final class MySQLSQLBuilder extends AbstractSQLBuilder {
public MySQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@Override
public String getLeftIdentifierQuoteString() {
return "`";
......@@ -45,9 +53,11 @@ public final class MySQLSQLBuilder extends AbstractSQLBuilder {
StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
if (!dataRecord.getColumn(i).isPrimaryKey()) {
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
if (column.isPrimaryKey() || ShardingColumnsUtil.isShardingColumn(
getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
continue;
}
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
}
result.setLength(result.length() - 1);
return result.toString();
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.mysql;
import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
......@@ -42,7 +43,7 @@ public final class MySQLImporterTest {
@Test
public void assertCreateSqlBuilder() {
MySQLImporter mySQLImporter = new MySQLImporter(importerConfig, dataSourceManager);
String insertSQL = mySQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
String insertSQL = mySQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=VALUES(`name`)"));
}
......
......@@ -17,18 +17,23 @@
package org.apache.shardingsphere.scaling.mysql;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Test;
import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class MySQLSQLBuilderTest {
private AbstractSQLBuilder sqlBuilder = new MySQLSQLBuilder();
private AbstractSQLBuilder sqlBuilder = new MySQLSQLBuilder(ImmutableMap.<String, Set<String>>builder()
.put("t2", Sets.newHashSet("sc")).build());
@Test
public void assertBuildInsertSQL() {
......@@ -36,6 +41,12 @@ public class MySQLSQLBuilderTest {
assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `sc`=VALUES(`sc`),`c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
}
@Test
public void assertBuildInsertSQLHasShardingColumn() {
String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t2"));
assertThat(actual, is("INSERT INTO `t2`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
}
private DataRecord mockDataRecord(final String tableName) {
DataRecord result = new DataRecord(new NopPosition(), 4);
result.setTableName(tableName);
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
......@@ -41,6 +42,6 @@ public final class PostgreSQLDataConsistencyChecker extends AbstractDataConsiste
@Override
protected AbstractSQLBuilder getSqlBuilder() {
return new PostgreSQLSQLBuilder();
return new PostgreSQLSQLBuilder(Maps.newHashMap());
}
}
......@@ -22,6 +22,9 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import java.util.Map;
import java.util.Set;
/**
* postgreSQL importer.
*/
......@@ -32,8 +35,8 @@ public final class PostgreSQLImporter extends AbstractJDBCImporter {
}
@Override
protected AbstractSQLBuilder createSQLBuilder() {
return new PostgreSQLSQLBuilder();
protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
return new PostgreSQLSQLBuilder(shardingColumnsMap);
}
}
......@@ -22,11 +22,18 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import java.util.Map;
import java.util.Set;
/**
* PostgreSQL SQL builder.
*/
public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
public PostgreSQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@Override
public String getLeftIdentifierQuoteString() {
return "\"";
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
......@@ -43,7 +44,7 @@ public final class PostgreSQLImporterTest {
@Test
public void assertCreateSQLBuilder() {
PostgreSQLImporter postgreSQLImporter = new PostgreSQLImporter(importerConfig, dataSourceManager);
String insertSQL = postgreSQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
String insertSQL = postgreSQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
......@@ -30,7 +31,7 @@ public final class PostgreSQLSqlBuilderTest {
@Test
public void assertBuildInsertSQL() {
String actual = new PostgreSQLSQLBuilder().buildInsertSQL(mockDataRecord());
String actual = new PostgreSQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册