未验证 提交 f232469e 编写于 作者: Z Zhang Yonglun 提交者: GitHub

Merge pull request #8184 from terrymanu/dev

Move CIRCUIT_BREAK state in to state machine
......@@ -32,48 +32,48 @@ import java.util.Map.Entry;
* @see <a href="https://www.postgresql.org/docs/12/protocol-message-formats.html">ErrorResponse (B)</a>
*/
public final class PostgreSQLErrorResponsePacket implements PostgreSQLPacket {
public static final char FIELD_TYPE_SEVERITY = 'S';
public static final char FIELD_TYPE_SEVERITY2 = 'V';
public static final char FIELD_TYPE_CODE = 'C';
public static final char FIELD_TYPE_MESSAGE = 'M';
public static final char FIELD_TYPE_DETAIL = 'D';
public static final char FIELD_TYPE_HINT = 'H';
public static final char FIELD_TYPE_POSITION = 'P';
public static final char FIELD_TYPE_INTERNAL_POSITION = 'p';
public static final char FIELD_TYPE_INTERNAL_QUERY = 'q';
public static final char FIELD_TYPE_WHERE = 'W';
public static final char FIELD_TYPE_SCHEMA_NAME = 's';
public static final char FIELD_TYPE_TABLE_NAME = 't';
public static final char FIELD_TYPE_COLUMN_NAME = 'c';
public static final char FIELD_TYPE_DATA_TYPE_NAME = 'd';
public static final char FIELD_TYPE_CONSTRAINT_NAME = 'n';
public static final char FIELD_TYPE_FILE = 'F';
public static final char FIELD_TYPE_LINE = 'L';
public static final char FIELD_TYPE_ROUTINE = 'R';
@Getter
private final char messageType = PostgreSQLCommandPacketType.ERROR_RESPONSE.getValue();
private final Map<Character, String> fields = new HashMap<>();
@Override
public void write(final PostgreSQLPacketPayload payload) {
for (Entry<Character, String> each : fields.entrySet()) {
......@@ -82,7 +82,7 @@ public final class PostgreSQLErrorResponsePacket implements PostgreSQLPacket {
}
payload.writeInt1(0);
}
/**
* Add field.
*
......
......@@ -22,6 +22,8 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.impl.CircuitBreakProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.LockProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState;
import java.util.Map;
......@@ -40,6 +42,8 @@ public final class ProxyStateMachine {
static {
PROXY_STATE_MAP.put(ProxyStateType.OK, new OKProxyState());
PROXY_STATE_MAP.put(ProxyStateType.LOCK, new LockProxyState());
PROXY_STATE_MAP.put(ProxyStateType.CIRCUIT_BREAK, new CircuitBreakProxyState());
CURRENT_STATE.set(PROXY_STATE_MAP.get(ProxyStateType.OK));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.state.impl;
import com.google.common.eventbus.Subscribe;
import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
import org.apache.shardingsphere.proxy.frontend.state.ProxyStateMachine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyStateType;
import java.util.Optional;
/**
* Circuit break proxy state.
*/
public final class CircuitBreakProxyState implements ProxyState {
public CircuitBreakProxyState() {
GovernanceEventBus.getInstance().register(this);
}
@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new CircuitBreakException()));
Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
databasePacket.ifPresent(context::writeAndFlush);
}
/**
* Renew circuit breaker state.
*
* @param event circuit state changed event
*/
@Subscribe
public synchronized void renew(final CircuitStateChangedEvent event) {
if (event.isCircuitBreak()) {
ProxyStateMachine.switchState(ProxyStateType.CIRCUIT_BREAK);
} else {
// TODO check previous state, maybe lock
ProxyStateMachine.switchState(ProxyStateType.OK);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.state.impl;
import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
/**
* Lock proxy state.
*/
public final class LockProxyState implements ProxyState {
@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
throw new UnsupportedOperationException("LockProxyState");
}
}
......@@ -24,11 +24,11 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.e
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
......@@ -36,7 +36,6 @@ import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
......@@ -65,9 +64,6 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
throw new CircuitBreakException();
}
BackendResponse backendResponse = databaseCommunicationEngine.execute();
return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
}
......
......@@ -24,8 +24,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.que
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
......@@ -56,9 +54,6 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
throw new CircuitBreakException();
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
}
......
......@@ -28,7 +28,6 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.bin
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
......@@ -48,7 +47,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
......@@ -80,9 +78,6 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
return Collections.singletonList(new PostgreSQLErrorResponsePacket());
}
List<DatabasePacket<?>> result = new LinkedList<>();
result.add(new PostgreSQLBindCompletePacket());
if (null == databaseCommunicationEngine) {
......
......@@ -25,12 +25,10 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.Pos
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLComQueryPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
......@@ -62,9 +60,6 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) {
return Collections.singletonList(new PostgreSQLErrorResponsePacket());
}
BackendResponse backendResponse = textProtocolBackendHandler.execute();
if (backendResponse instanceof QueryResponse) {
Optional<PostgreSQLRowDescriptionPacket> result = createQueryPacket((QueryResponse) backendResponse);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册