未验证 提交 4e28d946 编写于 作者: 邱鹿 Lucas 提交者: GitHub

add unit test for shardingsphere-scaling-postgresql (#6983)

Co-authored-by: qiulu3 <Lucas209910>
上级 b5cc7209
......@@ -24,8 +24,6 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import javax.sql.DataSource;
import java.sql.Connection;
......@@ -42,7 +40,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLJdbcDumperTest {
private DataSourceManager dataSourceManager;
......
......@@ -40,5 +40,10 @@
<artifactId>postgresql</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -38,7 +38,7 @@ public final class PostgreSQLJdbcDumper extends AbstractJDBCDumper {
@Override
protected PreparedStatement createPreparedStatement(final Connection conn, final String sql) throws SQLException {
PreparedStatement result = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
result.setFetchSize(1);
result.setFetchSize(100);
return result;
}
}
......@@ -33,7 +33,6 @@ import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.scaling.postgresql.wal.decode.TestDecodingPlugin;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
import org.postgresql.PGConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;
......@@ -74,8 +73,8 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
private void dump() {
try {
PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
Connection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
while (isRunning()) {
......
......@@ -23,6 +23,7 @@ import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
......@@ -33,16 +34,26 @@ import java.util.Properties;
public final class LogicalReplication {
/**
* Create PostgreSQL connection.
* Create PostgreSQL connection.
*
* @param jdbcDataSourceConfig JDBC data source configuration
* @return PostgreSQL connection
* @throws SQLException sql exception
*/
public PGConnection createPgConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
public Connection createPgConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
return createConnection(jdbcDataSourceConfig);
}
private Connection createConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
Properties props = new Properties();
PGProperty.USER.set(props, jdbcDataSourceConfig.getUsername());
PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
return DriverManager.getConnection(jdbcDataSourceConfig.getJdbcUrl(), props);
}
/**
* Create PostgreSQL replication stream.
*
......@@ -52,8 +63,8 @@ public final class LogicalReplication {
* @return replication stream
* @throws SQLException sql exception
*/
public PGReplicationStream createReplicationStream(final PGConnection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
return pgConnection.getReplicationAPI()
public PGReplicationStream createReplicationStream(final Connection pgConnection, final String slotName, final LogSequenceNumber startPosition) throws SQLException {
return pgConnection.unwrap(PGConnection.class).getReplicationAPI()
.replicationStream()
.logical()
.withStartPosition(startPosition)
......@@ -62,14 +73,4 @@ public final class LogicalReplication {
.withSlotOption("skip-empty-xacts", true)
.start();
}
private PGConnection createConnection(final JDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
Properties props = new Properties();
PGProperty.USER.set(props, jdbcDataSourceConfig.getUsername());
PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
return DriverManager.getConnection(jdbcDataSourceConfig.getJdbcUrl(), props).unwrap(PGConnection.class);
}
}
......@@ -42,12 +42,18 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLDataSourceCheckerTest {
private static final String CATALOG = "test";
@Mock
private Connection connection;
@Mock
private PreparedStatement preparedStatement;
@Mock
private ResultSet resultSet;
@Mock
private DatabaseMetaData metaData;
private Collection<DataSource> dataSources;
......@@ -56,40 +62,47 @@ public final class PostgreSQLDataSourceCheckerTest {
DataSource dataSource = mock(DataSource.class);
Connection connection = mockConnection();
when(dataSource.getConnection()).thenReturn(connection);
when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
dataSources = new LinkedList<>();
dataSources.add(dataSource);
}
@SneakyThrows
private Connection mockConnection() {
DatabaseMetaData metaData = mock(DatabaseMetaData.class);
when(connection.getMetaData()).thenReturn(metaData);
ResultSet resultSet = mockResultSet();
when(metaData.getTables(CATALOG, null, "%", new String[]{"TABLE"})).thenReturn(resultSet);
when(connection.getCatalog()).thenReturn(CATALOG);
when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenReturn(preparedStatement);
return connection;
}
@SneakyThrows
private ResultSet mockResultSet() {
ResultSet resultSet = mock(ResultSet.class);
when(resultSet.next()).thenReturn(true);
when(resultSet.getString(3)).thenReturn("test");
return resultSet;
}
@Test
public void assertCheckPrivilege() throws SQLException {
when(resultSet.next()).thenReturn(true);
when(resultSet.getString(3)).thenReturn("test");
PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
dataSourceChecker.checkPrivilege(dataSources);
verify(preparedStatement).executeQuery();
}
@Test(expected = PrepareFailedException.class)
public void assertCheckPrivilegeFailed() throws SQLException {
when(preparedStatement.executeQuery()).thenThrow(new SQLException());
public void assertCheckPrivilegeWithoutTable() throws SQLException {
when(resultSet.next()).thenReturn(false);
PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
dataSourceChecker.checkPrivilege(dataSources);
}
@Test(expected = PrepareFailedException.class)
public void assertCheckPrivilegeFailure() throws SQLException {
when(resultSet.next()).thenReturn(true);
when(resultSet.getString(3)).thenReturn("test");
when(connection.prepareStatement("SELECT * FROM test LIMIT 1")).thenThrow(new SQLException());
PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
dataSourceChecker.checkPrivilege(dataSources);
}
@Test
public void assertCheckVariable() {
PostgreSQLDataSourceChecker dataSourceChecker = new PostgreSQLDataSourceChecker();
dataSourceChecker.checkVariable(dataSources);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.postgresql.replication.LogSequenceNumber;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public final class PostgreSQLImporterTest {
@Mock
private ImporterConfiguration importerConfiguration;
@Mock
private DataSourceManager dataSourceManager;
@Test
public void assertCreateSqlBuilder() {
PostgreSQLImporter mySQLImporter = new PostgreSQLImporter(importerConfiguration, dataSourceManager);
String insertSQL = mySQLImporter.createSqlBuilder().buildInsertSQL(mockDataRecord());
assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}
private DataRecord mockDataRecord() {
DataRecord result = new DataRecord(new WalPosition(LogSequenceNumber.valueOf(100L)), 2);
result.setTableName("t_order");
result.addColumn(new Column("id", 1, true, true));
result.addColumn(new Column("name", "", true, false));
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public final class PostgreSQLJdbcDumperTest {
private DataSourceManager dataSourceManager;
private PostgreSQLJdbcDumper postgreSQLJdbcDumper;
@Before
public void setUp() {
dataSourceManager = new DataSourceManager();
postgreSQLJdbcDumper = new PostgreSQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
}
private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
DumperConfiguration dumperConfiguration = mockDumperConfiguration();
initTableData(dumperConfiguration);
InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfiguration);
result.setTableName("t_order");
return result;
}
@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
}
@Test
@SneakyThrows(SQLException.class)
public void assertCreatePreparedStatement() {
DataSource dataSource = dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = postgreSQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
assertThat(preparedStatement.getFetchSize(), is(100));
}
}
private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration dumperConfiguration = new DumperConfiguration();
dumperConfiguration.setDataSourceConfiguration(
new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
return dumperConfiguration;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
import org.apache.shardingsphere.scaling.postgresql.utils.ReflectionUtil;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class PostgreSQLWalDumperTest {
@Mock
private LogicalReplication logicalReplication;
@Mock
private PgConnection pgConnection;
@Mock
private PGReplicationStream pgReplicationStream;
private WalPosition position;
private PostgreSQLWalDumper postgreSQLWalDumper;
private JDBCDataSourceConfiguration jdbcDataSourceConfiguration;
private MemoryChannel channel;
@Before
public void setUp() {
ScalingContext.getInstance().init(new ServerConfiguration());
position = new WalPosition(LogSequenceNumber.valueOf(100L));
postgreSQLWalDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position);
channel = new MemoryChannel(records -> {
});
postgreSQLWalDumper.setChannel(channel);
}
private DumperConfiguration mockDumperConfiguration() {
jdbcDataSourceConfiguration = new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root");
DumperConfiguration dumperConfiguration = new DumperConfiguration();
dumperConfiguration.setDataSourceConfiguration(jdbcDataSourceConfiguration);
return dumperConfiguration;
}
@Test
@SneakyThrows({ReflectiveOperationException.class, SQLException.class})
public void assertStart() {
try {
ReflectionUtil.setFieldValueToClass(postgreSQLWalDumper, "logicalReplication", logicalReplication);
when(logicalReplication.createPgConnection(jdbcDataSourceConfiguration)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
when(pgConnection.getTimestampUtils()).thenReturn(null);
when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException());
when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
postgreSQLWalDumper.start();
} catch (SyncTaskExecuteException ignore) {
}
assertThat(channel.fetchRecords(100, 0).size(), is(1));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.lang.reflect.Field;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ReflectionUtil {
/**
* Get field from class.
*
* @param targetClass target class
* @param fieldName field name
* @param isDeclared is declared
* @return {@link Field}
* @throws NoSuchFieldException no such field exception
*/
public static Field getFieldFromClass(final Class<?> targetClass, final String fieldName, final boolean isDeclared) throws NoSuchFieldException {
Field targetField;
if (isDeclared) {
targetField = targetClass.getDeclaredField(fieldName);
} else {
targetField = targetClass.getField(fieldName);
}
targetField.setAccessible(true);
return targetField;
}
/**
* Get field value from instance target object.
*
* @param target target object
* @param fieldName field name
* @param valueClass expected value class
* @param <T> expected value class
* @return target filed value
* @throws NoSuchFieldException no such field exception
* @throws IllegalAccessException illegal access exception
*/
@SuppressWarnings("unchecked")
public static <T> T getFieldValueFromClass(final Object target, final String fieldName, final Class<T> valueClass) throws NoSuchFieldException, IllegalAccessException {
Field field = getFieldFromClass(target.getClass(), fieldName, true);
Object value = field.get(target);
if (null == value) {
return null;
}
if (value.getClass().isAssignableFrom(value.getClass())) {
return (T) value;
}
throw new ClassCastException("field " + fieldName + " is " + target.getClass().getName() + " can cast to " + valueClass.getName());
}
/**
* Set value to target object field.
*
* @param target target object
* @param fieldName field name
* @param value new value
* @throws NoSuchFieldException no such field exception
* @throws IllegalAccessException illegal access exception
*/
public static void setFieldValueToClass(final Object target, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
Field field = getFieldFromClass(target.getClass(), fieldName, true);
field.set(target, value);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql.wal;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.postgresql.PGConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationConnection;
import org.postgresql.replication.fluent.ChainedStreamBuilder;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import java.sql.Connection;
import java.sql.SQLException;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class LogicalReplicationTest {
@Mock
private PgConnection pgConnection;
@Mock
private PGReplicationConnection pgReplicationConnection;
@Mock
private ChainedStreamBuilder chainedStreamBuilder;
@Mock
private ChainedLogicalStreamBuilder chainedLogicalStreamBuilder;
private LogicalReplication logicalReplication;
@Before
public void setUp() {
logicalReplication = new LogicalReplication();
}
@Test
@SneakyThrows(SQLException.class)
public void assertCreatePgConnectionSuccess() {
Connection pgConnection = logicalReplication.createPgConnection(
new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
assertFalse(pgConnection.isClosed());
}
@Test
@SneakyThrows(SQLException.class)
public void assertCreateReplicationStreamSuccess() {
LogSequenceNumber startPosition = LogSequenceNumber.valueOf(100L);
when(pgConnection.unwrap(PGConnection.class)).thenReturn(pgConnection);
when(pgConnection.getReplicationAPI()).thenReturn(pgReplicationConnection);
when(pgReplicationConnection.replicationStream()).thenReturn(chainedStreamBuilder);
when(chainedStreamBuilder.logical()).thenReturn(chainedLogicalStreamBuilder);
when(chainedLogicalStreamBuilder.withStartPosition(startPosition)).thenReturn(chainedLogicalStreamBuilder);
when(chainedLogicalStreamBuilder.withSlotName("")).thenReturn(chainedLogicalStreamBuilder);
when(chainedLogicalStreamBuilder.withSlotOption(anyString(), eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
logicalReplication.createReplicationStream(pgConnection, "", startPosition);
verify(chainedLogicalStreamBuilder).start();
}
@Test(expected = SQLException.class)
@SneakyThrows(SQLException.class)
public void assertCreateReplicationStreamFailure() {
when(pgConnection.unwrap(PGConnection.class)).thenThrow(new SQLException());
logicalReplication.createReplicationStream(pgConnection, "", LogSequenceNumber.valueOf(100L));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql.wal;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractRowEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class WalEventConverterTest {
private WalEventConverter walEventConverter;
@Before
public void setUp() {
DumperConfiguration dumperConfiguration = mockDumperConfiguration();
walEventConverter = new WalEventConverter(dumperConfiguration);
initTableData(dumperConfiguration);
}
private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration reslut = new DumperConfiguration();
reslut.setDataSourceConfiguration(new JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
Map<String, String> tableNameMap = Maps.newHashMap();
tableNameMap.put("t_order", "t_order");
reslut.setTableNameMap(tableNameMap);
return reslut;
}
@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
DataSource dataSource = new DataSourceManager().getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY, user_id VARCHAR(12))");
statement.execute("INSERT INTO t_order (id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
}
@Test
public void assertConvertWriteRowEvent() {
Record record = walEventConverter.convert(mockWriteRowEvent());
assertTrue(record instanceof DataRecord);
assertThat(((DataRecord) record).getType(), is(ScalingConstant.INSERT));
}
@Test
public void assertConvertUpdateRowEvent() {
Record record = walEventConverter.convert(mockUpdateRowEvent());
assertTrue(record instanceof DataRecord);
assertThat(((DataRecord) record).getType(), is(ScalingConstant.UPDATE));
}
@Test
public void assertConvertDeleteRowEvent() {
Record record = walEventConverter.convert(mockDeleteRowEvent());
assertTrue(record instanceof DataRecord);
assertThat(((DataRecord) record).getType(), is(ScalingConstant.DELETE));
}
@Test
public void assertConvertPlaceholderEvent() {
Record record = walEventConverter.convert(new PlaceholderEvent());
assertTrue(record instanceof PlaceholderRecord);
}
@Test
public void assertUnknownTable() {
Record record = walEventConverter.convert(mockUnknownTableEvent());
assertTrue(record instanceof PlaceholderRecord);
}
@Test(expected = UnsupportedOperationException.class)
public void assertConvertFailure() {
walEventConverter.convert(new AbstractRowEvent());
}
private AbstractRowEvent mockWriteRowEvent() {
WriteRowEvent result = new WriteRowEvent();
result.setSchemaName("");
result.setTableName("t_order");
result.setAfterRow(Lists.newArrayList("id", "user_id"));
return result;
}
private AbstractRowEvent mockUpdateRowEvent() {
UpdateRowEvent result = new UpdateRowEvent();
result.setSchemaName("");
result.setTableName("t_order");
result.setAfterRow(Lists.newArrayList("id", "user_id"));
return result;
}
private AbstractRowEvent mockDeleteRowEvent() {
DeleteRowEvent result = new DeleteRowEvent();
result.setSchemaName("");
result.setTableName("t_order");
result.setPrimaryKeys(Lists.newArrayList("id"));
return result;
}
private AbstractRowEvent mockUnknownTableEvent() {
WriteRowEvent result = new WriteRowEvent();
result.setSchemaName("");
result.setTableName("t_other");
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.postgresql.wal;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
import static org.junit.Assert.assertThat;
import static org.hamcrest.CoreMatchers.is;
public final class WalPositionTest {
@Test
public void assertCompareTo() {
WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
assertThat(walPosition.compareTo(null), is(1));
assertThat(walPosition.compareTo(new WalPosition(LogSequenceNumber.valueOf(100L))), is(0));
}
@Test
public void assertToJson() {
WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L));
assertThat(walPosition.toJson().toString(), is("100"));
}
}
......@@ -17,55 +17,86 @@
package org.apache.shardingsphere.scaling.postgresql.wal.decode;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.DeleteRowEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.UpdateRowEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.WriteRowEvent;
import org.junit.Test;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.replication.LogSequenceNumber;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class TestDecodingPluginTest {
private LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
@Test
public void assertDecodeWriteRowEvent() {
LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[character varying]:'1 2 3'''".getBytes());
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
assertThat(actual.getLogSequenceNumber(), is(lsn));
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
}
@Test
public void assertDecodeUpdateRowEvent() {
LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: data[character varying]:'1 2 3'''".getBytes());
UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
assertThat(actual.getLogSequenceNumber(), is(lsn));
UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
}
@Test
public void assertDecodeDeleteRowEvent() {
LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
assertThat(actual.getLogSequenceNumber(), is(lsn));
DeleteRowEvent actual = (DeleteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getPrimaryKeys().get(0), is(1));
}
@Test
public void assertDecodeWriteRowEventWithByteA() {
LogSequenceNumber lsn = LogSequenceNumber.valueOf("0/14EFDB8");
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[bytea]:'\\xff00ab'".getBytes());
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, lsn);
assertThat(actual.getLogSequenceNumber(), is(lsn));
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
assertThat(actual.getAfterRow().get(0), is(new byte[] {(byte) 0xff, (byte) 0, (byte) 0xab}));
assertThat(actual.getAfterRow().get(0), is(new byte[]{(byte) 0xff, (byte) 0, (byte) 0xab}));
}
@Test
public void assertDecodeUnknownTableType() {
ByteBuffer data = ByteBuffer.wrap("unknown".getBytes());
AbstractWalEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertTrue(actual instanceof PlaceholderEvent);
}
@Test(expected = SyncTaskExecuteException.class)
public void assertDecodeUnknownRowEventType() {
ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN: data[character varying]:'1 2 3'''".getBytes());
new TestDecodingPlugin(null).decode(data, logSequenceNumber);
}
@Test(expected = DecodingException.class)
@SneakyThrows(SQLException.class)
public void assertDecodeTime() {
TimestampUtils timestampUtils = mock(TimestampUtils.class);
when(timestampUtils.toTime(eq(null), eq("1 2 3'"))).thenThrow(new SQLException());
ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[time without time zone]:'1 2 3'''".getBytes());
new TestDecodingPlugin(timestampUtils).decode(data, logSequenceNumber);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册