diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java index ec7fadcc405643175f074392ad12b616e0467665..edf9e92c8cec75460d708d20db93177682a1fa54 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java @@ -17,12 +17,17 @@ package org.apache.shardingsphere.proxy.frontend.state.impl; +import com.google.common.eventbus.Subscribe; import io.netty.channel.ChannelHandlerContext; import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; +import org.apache.shardingsphere.governance.core.event.GovernanceEventBus; +import org.apache.shardingsphere.governance.core.registry.event.CircuitStateChangedEvent; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException; import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine; import org.apache.shardingsphere.proxy.frontend.state.ProxyState; +import org.apache.shardingsphere.proxy.frontend.state.ProxyStateMachine; +import org.apache.shardingsphere.proxy.frontend.state.ProxyStateType; import java.util.Optional; @@ -31,10 +36,29 @@ import java.util.Optional; */ public final class CircuitBreakProxyState implements ProxyState { + public CircuitBreakProxyState() { + GovernanceEventBus.getInstance().register(this); + } + @Override public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) { context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new CircuitBreakException())); Optional> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(); databasePacket.ifPresent(context::writeAndFlush); } + + /** + * Renew circuit breaker state. + * + * @param event circuit state changed event + */ + @Subscribe + public synchronized void renew(final CircuitStateChangedEvent event) { + if (event.isCircuitBreak()) { + ProxyStateMachine.switchState(ProxyStateType.CIRCUIT_BREAK); + } else { + // TODO check previous state, maybe lock + ProxyStateMachine.switchState(ProxyStateType.OK); + } + } }