未验证 提交 b3cd9f75 编写于 作者: L Liang Zhang 提交者: GitHub

Refactor QueryCommandExecutor.isQueryResponse for impl (#7041)

* Decouple AuthenticationEngine and BackendConnection

* Decouple AuthenticationEngine and BackendConnection

* Unify userName to username

* Refactor CommandExecutorTask

* Refactor MySQLComQueryPacketExecutor.isQueryResponse

* Refactor MySQLComStmtExecuteExecutor.isQueryResponse

* Refactor PostgreSQLComBindExecutor.isQueryResponse

* Refactor PostgreSQLComQueryExecutor.isQueryResponse
上级 45d0cc11
......@@ -30,6 +30,7 @@ import org.apache.shardingsphere.infra.hook.RootInvokeHook;
import org.apache.shardingsphere.infra.hook.SPIRootInvokeHook;
import org.apache.shardingsphere.metrics.enums.MetricsLabelEnum;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStateHandler;
import org.apache.shardingsphere.proxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.engine.CommandExecuteEngine;
......@@ -70,8 +71,9 @@ public final class CommandExecutorTask implements Runnable {
boolean isNeedFlush = false;
try (BackendConnection backendConnection = this.backendConnection;
PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
backendConnection.getStateHandler().waitUntilConnectionReleasedIfNecessary();
backendConnection.getStateHandler().setRunningStatusIfNecessary();
ConnectionStateHandler stateHandler = backendConnection.getStateHandler();
stateHandler.waitUntilConnectionReleasedIfNecessary();
stateHandler.setRunningStatusIfNecessary();
isNeedFlush = executeCommand(context, payload, backendConnection);
connectionSize = backendConnection.getConnectionSize();
// CHECKSTYLE:OFF
......
......@@ -44,13 +44,13 @@ public final class MySQLAuthenticationHandler {
/**
* Login.
*
* @param userName user name.
* @param username username.
* @param authResponse auth response
* @param database database
* @return login success or failure
*/
public Optional<MySQLServerErrorCode> login(final String userName, final byte[] authResponse, final String database) {
Optional<ProxyUser> user = getUser(userName);
public Optional<MySQLServerErrorCode> login(final String username, final byte[] authResponse, final String database) {
Optional<ProxyUser> user = getUser(username);
if (!user.isPresent() || !isPasswordRight(user.get().getPassword(), authResponse)) {
return Optional.of(MySQLServerErrorCode.ER_ACCESS_DENIED_ERROR);
}
......
......@@ -74,7 +74,7 @@ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
@SneakyThrows
public void writeQueryData(final ChannelHandlerContext context,
final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) {
if (!queryCommandExecutor.isQuery() || !context.channel().isActive()) {
if (!queryCommandExecutor.isQueryResponse() || !context.channel().isActive()) {
return;
}
int count = 0;
......
......@@ -57,7 +57,8 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
private final DatabaseCommunicationEngine databaseCommunicationEngine;
private volatile boolean isQuery;
@Getter
private volatile boolean isQueryResponse;
@Getter
private volatile boolean isUpdateResponse;
......@@ -88,7 +89,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
isUpdateResponse = true;
return Collections.singletonList(createUpdatePacket((UpdateResponse) backendResponse));
}
isQuery = true;
isQueryResponse = true;
return createQueryPacket((QueryResponse) backendResponse);
}
......@@ -112,11 +113,6 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
return result;
}
@Override
public boolean isQuery() {
return isQuery;
}
@Override
public boolean next() throws SQLException {
return databaseCommunicationEngine.next();
......
......@@ -56,7 +56,8 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
private final TextProtocolBackendHandler textProtocolBackendHandler;
private volatile boolean isQuery;
@Getter
private volatile boolean isQueryResponse;
@Getter
private volatile boolean isUpdateResponse;
......@@ -84,7 +85,7 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
isUpdateResponse = true;
return Collections.singletonList(createUpdatePacket((UpdateResponse) backendResponse));
}
isQuery = true;
isQueryResponse = true;
return createQueryPackets((QueryResponse) backendResponse);
}
......@@ -125,11 +126,6 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
return result;
}
@Override
public boolean isQuery() {
return isQuery;
}
@Override
public boolean next() throws SQLException {
return textProtocolBackendHandler.next();
......
......@@ -111,6 +111,6 @@ public final class MySQLComStmtExecuteExecutorTest {
FieldSetter.setField(mysqlComStmtExecuteExecutor, MySQLComStmtExecuteExecutor.class.getDeclaredField("databaseCommunicationEngine"), databaseCommunicationEngine);
when(databaseCommunicationEngine.execute()).thenReturn(new QueryResponse(Collections.singletonList(mock(QueryHeader.class))));
mysqlComStmtExecuteExecutor.execute();
assertThat(mysqlComStmtExecuteExecutor.isQuery(), Matchers.is(true));
assertThat(mysqlComStmtExecuteExecutor.isQueryResponse(), Matchers.is(true));
}
}
......@@ -73,6 +73,6 @@ public final class MySQLComQueryPacketExecutorTest {
FieldSetter.setField(mysqlComQueryPacketExecutor, MySQLComQueryPacketExecutor.class.getDeclaredField("textProtocolBackendHandler"), textProtocolBackendHandler);
when(textProtocolBackendHandler.execute()).thenReturn(new QueryResponse(Collections.singletonList(mock(QueryHeader.class))));
mysqlComQueryPacketExecutor.execute();
assertThat(mysqlComQueryPacketExecutor.isQuery(), Matchers.is(true));
assertThat(mysqlComQueryPacketExecutor.isQueryResponse(), Matchers.is(true));
}
}
......@@ -18,9 +18,6 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.auth;
import com.google.common.base.Strings;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;
......@@ -29,6 +26,10 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.handshake.Postgre
import org.apache.shardingsphere.infra.auth.ProxyUser;
import org.apache.shardingsphere.proxy.backend.schema.ProxySchemaContexts;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.Map;
/**
* Authentication handler for PostgreSQL.
*/
......@@ -37,39 +38,36 @@ public class PostgreSQLAuthenticationHandler {
/**
* Login.
*
* @param userName user name
* @param databaseName database name
* @param md5Salt md5 salt
* @param username username
* @param databaseName database name
* @param md5Salt MD5 salt
* @param passwordMessagePacket password message packet
* @return PostgreSQLLoginResult
* @return PostgreSQL login result
*/
public static PostgreSQLLoginResult loginWithMd5Password(final String userName, final String databaseName, final byte[] md5Salt, final PostgreSQLPasswordMessagePacket passwordMessagePacket) {
public static PostgreSQLLoginResult loginWithMd5Password(final String username, final String databaseName, final byte[] md5Salt, final PostgreSQLPasswordMessagePacket passwordMessagePacket) {
ProxyUser proxyUser = null;
for (Map.Entry<String, ProxyUser> entry : ProxySchemaContexts.getInstance().getSchemaContexts().getAuthentication().getUsers().entrySet()) {
if (entry.getKey().equals(userName)) {
if (entry.getKey().equals(username)) {
proxyUser = entry.getValue();
break;
}
}
if (null == proxyUser) {
return new PostgreSQLLoginResult(PostgreSQLErrorCode.INVALID_AUTHORIZATION_SPECIFICATION, "unknown userName: " + userName);
return new PostgreSQLLoginResult(PostgreSQLErrorCode.INVALID_AUTHORIZATION_SPECIFICATION, "unknown username: " + username);
}
String md5Digest = passwordMessagePacket.getMd5Digest();
String expectedMd5Digest = md5Encode(userName, proxyUser.getPassword(), md5Salt);
String expectedMd5Digest = md5Encode(username, proxyUser.getPassword(), md5Salt);
if (!expectedMd5Digest.equals(md5Digest)) {
return new PostgreSQLLoginResult(PostgreSQLErrorCode.INVALID_PASSWORD, "password authentication failed for user \"" + userName + "\"");
return new PostgreSQLLoginResult(PostgreSQLErrorCode.INVALID_PASSWORD, "password authentication failed for user \"" + username + "\"");
}
if (!isAuthorizedSchema(proxyUser.getAuthorizedSchemas(), databaseName)) {
return new PostgreSQLLoginResult(PostgreSQLErrorCode.PRIVILEGE_NOT_GRANTED, String.format("Access denied for user '%s' to database '%s'", userName, databaseName));
return new PostgreSQLLoginResult(PostgreSQLErrorCode.PRIVILEGE_NOT_GRANTED, String.format("Access denied for user '%s' to database '%s'", username, databaseName));
}
return new PostgreSQLLoginResult(PostgreSQLErrorCode.SUCCESSFUL_COMPLETION, null);
}
private static String md5Encode(final String userName, final String password, final byte[] md5Salt) {
String passwordHash = new String(Hex.encodeHex(DigestUtils.md5(password + userName), true));
private static String md5Encode(final String username, final String password, final byte[] md5Salt) {
String passwordHash = new String(Hex.encodeHex(DigestUtils.md5(password + username), true));
MessageDigest messageDigest = DigestUtils.getMd5Digest();
messageDigest.update(passwordHash.getBytes());
messageDigest.update(md5Salt);
......
......@@ -77,7 +77,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
@SneakyThrows
public void writeQueryData(final ChannelHandlerContext context,
final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) {
if (queryCommandExecutor.isQuery() && !context.channel().isActive()) {
if (queryCommandExecutor.isQueryResponse() && !context.channel().isActive()) {
context.write(new PostgreSQLCommandCompletePacket());
context.write(new PostgreSQLReadyForQueryPacket());
return;
......
......@@ -63,7 +63,8 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
private final DatabaseCommunicationEngine databaseCommunicationEngine;
private volatile boolean isQuery;
@Getter
private volatile boolean isQueryResponse;
@Getter
private volatile boolean isUpdateResponse;
......@@ -118,7 +119,7 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
private Optional<PostgreSQLRowDescriptionPacket> createQueryPacket(final QueryResponse queryResponse) {
List<PostgreSQLColumnDescription> columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse);
isQuery = !columnDescriptions.isEmpty();
isQueryResponse = !columnDescriptions.isEmpty();
if (columnDescriptions.isEmpty() || packet.isBinaryRowData()) {
return Optional.empty();
}
......@@ -136,11 +137,6 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
return result;
}
@Override
public boolean isQuery() {
return isQuery;
}
@Override
public boolean next() throws SQLException {
return null != databaseCommunicationEngine && databaseCommunicationEngine.next();
......
......@@ -55,7 +55,8 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
private final TextProtocolBackendHandler textProtocolBackendHandler;
private volatile boolean isQuery;
@Getter
private volatile boolean isQueryResponse;
@Getter
private volatile boolean isUpdateResponse;
......@@ -95,7 +96,7 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
private Optional<PostgreSQLRowDescriptionPacket> createQueryPacket(final QueryResponse queryResponse) {
List<PostgreSQLColumnDescription> columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse);
isQuery = !columnDescriptions.isEmpty();
isQueryResponse = !columnDescriptions.isEmpty();
if (columnDescriptions.isEmpty()) {
return Optional.empty();
}
......@@ -105,7 +106,7 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
private List<PostgreSQLColumnDescription> getPostgreSQLColumnDescriptions(final QueryResponse queryResponse) {
List<PostgreSQLColumnDescription> result = new LinkedList<>();
List<QueryResult> queryResults = queryResponse.getQueryResults();
ResultSetMetaData resultSetMetaData = !queryResults.isEmpty() ? queryResults.get(0).getResultSetMetaData() : null;
ResultSetMetaData resultSetMetaData = queryResults.isEmpty() ? null : queryResults.get(0).getResultSetMetaData();
int columnIndex = 0;
for (QueryHeader each : queryResponse.getQueryHeaders()) {
result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), resultSetMetaData));
......@@ -113,11 +114,6 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
return result;
}
@Override
public boolean isQuery() {
return isQuery;
}
@Override
public boolean next() throws SQLException {
return textProtocolBackendHandler.next();
......
......@@ -44,7 +44,7 @@ public final class PostgreSQLCommandExecuteEngineTest {
@SneakyThrows
public void assertWriteQueryDataWithError() {
PostgreSQLCommandExecuteEngine postgreSQLCommandExecuteEngine = new PostgreSQLCommandExecuteEngine();
when(queryCommandExecutor.isQuery()).thenReturn(false);
when(queryCommandExecutor.isQueryResponse()).thenReturn(false);
when(queryCommandExecutor.isErrorResponse()).thenReturn(true);
postgreSQLCommandExecuteEngine.writeQueryData(channelHandlerContext, null, queryCommandExecutor, 0);
verify(channelHandlerContext, times(1)).write(isA(PostgreSQLReadyForQueryPacket.class));
......
......@@ -26,6 +26,13 @@ import java.sql.SQLException;
*/
public interface QueryCommandExecutor extends CommandExecutor {
/**
* Judge is query SQL or not.
*
* @return is query SQL or not
*/
boolean isQueryResponse();
/**
* Judge is update response.
*
......@@ -40,13 +47,6 @@ public interface QueryCommandExecutor extends CommandExecutor {
*/
boolean isErrorResponse();
/**
* Judge is query SQL or not.
*
* @return is query SQL or not
*/
boolean isQuery();
/**
* Goto next result value.
*
......@@ -61,5 +61,5 @@ public interface QueryCommandExecutor extends CommandExecutor {
* @return database packet of query data
* @throws SQLException SQL exception
*/
DatabasePacket getQueryData() throws SQLException;
DatabasePacket<?> getQueryData() throws SQLException;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册