diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/NettyBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/NettyBackendHandler.java index f5fec55c2c25ca221e4280c05769bab99da3fd43..9cbd33b27ea147f52acabef22471536c5ae0c613 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/NettyBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/NettyBackendHandler.java @@ -17,7 +17,6 @@ package io.shardingsphere.proxy.backend.netty; -import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.pool.SimpleChannelPool; import io.shardingsphere.core.constant.DatabaseType; @@ -36,6 +35,7 @@ import io.shardingsphere.proxy.backend.AbstractBackendHandler; import io.shardingsphere.proxy.backend.BackendExecutorContext; import io.shardingsphere.proxy.backend.ResultPacket; import io.shardingsphere.proxy.backend.netty.future.FutureRegistry; +import io.shardingsphere.proxy.backend.netty.future.SynchronizedFuture; import io.shardingsphere.proxy.backend.netty.mysql.MySQLQueryResult; import io.shardingsphere.proxy.config.ProxyTableMetaDataConnectionManager; import io.shardingsphere.proxy.config.RuleRegistry; @@ -46,7 +46,6 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePac import io.shardingsphere.proxy.transport.mysql.packet.command.query.text.query.ComQueryPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; -import io.shardingsphere.proxy.backend.netty.future.SynchronizedFuture; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -126,8 +125,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler { } List queryResults = synchronizedFuture.get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS); FutureRegistry.getInstance().delete(connectionId); - - List packets = Lists.newArrayListWithCapacity(queryResults.size()); + List packets = new ArrayList<>(queryResults.size()); for (QueryResult each : queryResults) { MySQLQueryResult queryResult = (MySQLQueryResult) each; if (0 == currentSequenceId) { @@ -138,13 +136,10 @@ public final class NettyBackendHandler extends AbstractBackendHandler { } packets.add(queryResult.getCommandResponsePackets()); } - CommandResponsePackets result = merge(routeResult.getSqlStatement(), packets, queryResults); SQLStatement sqlStatement = routeResult.getSqlStatement(); - if (!RULE_REGISTRY.isMasterSlaveOnly() && SQLType.DDL == sqlStatement.getType() && !sqlStatement.getTables().isEmpty()) { - String logicTableName = sqlStatement.getTables().getSingleTableName(); - TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader( - BackendExecutorContext.getInstance().getExecutorService(), new ProxyTableMetaDataConnectionManager(RULE_REGISTRY.getBackendDataSource())); - RULE_REGISTRY.getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, RULE_REGISTRY.getShardingRule())); + CommandResponsePackets result = merge(sqlStatement, packets, queryResults); + if (SQLType.DDL == sqlStatement.getType() && !sqlStatement.getTables().isEmpty()) { + refreshTableMetaData(sqlStatement.getTables().getSingleTableName()); } return result; } @@ -201,6 +196,13 @@ public final class NettyBackendHandler extends AbstractBackendHandler { return packets.get(0); } + // TODO :jiaqi use sql packet to refresh meta data + private void refreshTableMetaData(final String logicTableName) { + TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader( + BackendExecutorContext.getInstance().getExecutorService(), new ProxyTableMetaDataConnectionManager(RULE_REGISTRY.getBackendDataSource())); + RULE_REGISTRY.getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, RULE_REGISTRY.getShardingRule())); + } + @Override public boolean next() throws SQLException { if (null == mergedResult || !mergedResult.next()) { diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/future/SynchronizedFuture.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/future/SynchronizedFuture.java index 0baff8fe1a3a175c56ef3718aa5a11eb6c120ee2..477b359cd702820ab80ae5acab972accedbe23fe 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/future/SynchronizedFuture.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/future/SynchronizedFuture.java @@ -17,10 +17,10 @@ package io.shardingsphere.proxy.backend.netty.future; -import com.google.common.collect.Lists; import io.shardingsphere.core.merger.QueryResult; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit; * @author linjiaqi */ @Slf4j -public class SynchronizedFuture implements Future> { +public final class SynchronizedFuture implements Future> { private final CountDownLatch latch; @@ -43,7 +43,7 @@ public class SynchronizedFuture implements Future> { public SynchronizedFuture(final int resultSize) { latch = new CountDownLatch(resultSize); - responses = Lists.newArrayListWithCapacity(resultSize); + responses = new ArrayList<>(resultSize); } @Override diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/mysql/MySQLQueryResult.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/mysql/MySQLQueryResult.java index a36c6e5160b59025aa141ec87cdaf886c00dec33..285d28b5127d64f7fd34abff926da4757baaec32 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/mysql/MySQLQueryResult.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/netty/mysql/MySQLQueryResult.java @@ -45,6 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue; */ @Slf4j public final class MySQLQueryResult implements QueryResult { + @Getter private final CommandResponsePackets commandResponsePackets;