提交 4cc45d3f 编写于 作者: T terrymanu

refactor NettyBackendHandler

上级 b28c9fb6
......@@ -17,7 +17,6 @@
package io.shardingsphere.proxy.backend.netty;
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.pool.SimpleChannelPool;
import io.shardingsphere.core.constant.DatabaseType;
......@@ -36,6 +35,7 @@ import io.shardingsphere.proxy.backend.AbstractBackendHandler;
import io.shardingsphere.proxy.backend.BackendExecutorContext;
import io.shardingsphere.proxy.backend.ResultPacket;
import io.shardingsphere.proxy.backend.netty.future.FutureRegistry;
import io.shardingsphere.proxy.backend.netty.future.SynchronizedFuture;
import io.shardingsphere.proxy.backend.netty.mysql.MySQLQueryResult;
import io.shardingsphere.proxy.config.ProxyTableMetaDataConnectionManager;
import io.shardingsphere.proxy.config.RuleRegistry;
......@@ -46,7 +46,6 @@ import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePac
import io.shardingsphere.proxy.transport.mysql.packet.command.query.text.query.ComQueryPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket;
import io.shardingsphere.proxy.backend.netty.future.SynchronizedFuture;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......@@ -126,8 +125,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
}
List<QueryResult> queryResults = synchronizedFuture.get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
FutureRegistry.getInstance().delete(connectionId);
List<CommandResponsePackets> packets = Lists.newArrayListWithCapacity(queryResults.size());
List<CommandResponsePackets> packets = new ArrayList<>(queryResults.size());
for (QueryResult each : queryResults) {
MySQLQueryResult queryResult = (MySQLQueryResult) each;
if (0 == currentSequenceId) {
......@@ -138,13 +136,10 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
}
packets.add(queryResult.getCommandResponsePackets());
}
CommandResponsePackets result = merge(routeResult.getSqlStatement(), packets, queryResults);
SQLStatement sqlStatement = routeResult.getSqlStatement();
if (!RULE_REGISTRY.isMasterSlaveOnly() && SQLType.DDL == sqlStatement.getType() && !sqlStatement.getTables().isEmpty()) {
String logicTableName = sqlStatement.getTables().getSingleTableName();
TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(
BackendExecutorContext.getInstance().getExecutorService(), new ProxyTableMetaDataConnectionManager(RULE_REGISTRY.getBackendDataSource()));
RULE_REGISTRY.getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, RULE_REGISTRY.getShardingRule()));
CommandResponsePackets result = merge(sqlStatement, packets, queryResults);
if (SQLType.DDL == sqlStatement.getType() && !sqlStatement.getTables().isEmpty()) {
refreshTableMetaData(sqlStatement.getTables().getSingleTableName());
}
return result;
}
......@@ -201,6 +196,13 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
return packets.get(0);
}
// TODO :jiaqi use sql packet to refresh meta data
private void refreshTableMetaData(final String logicTableName) {
TableMetaDataLoader tableMetaDataLoader = new TableMetaDataLoader(
BackendExecutorContext.getInstance().getExecutorService(), new ProxyTableMetaDataConnectionManager(RULE_REGISTRY.getBackendDataSource()));
RULE_REGISTRY.getMetaData().getTable().put(logicTableName, tableMetaDataLoader.load(logicTableName, RULE_REGISTRY.getShardingRule()));
}
@Override
public boolean next() throws SQLException {
if (null == mergedResult || !mergedResult.next()) {
......
......@@ -17,10 +17,10 @@
package io.shardingsphere.proxy.backend.netty.future;
import com.google.common.collect.Lists;
import io.shardingsphere.core.merger.QueryResult;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
......@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
* @author linjiaqi
*/
@Slf4j
public class SynchronizedFuture implements Future<List<QueryResult>> {
public final class SynchronizedFuture implements Future<List<QueryResult>> {
private final CountDownLatch latch;
......@@ -43,7 +43,7 @@ public class SynchronizedFuture implements Future<List<QueryResult>> {
public SynchronizedFuture(final int resultSize) {
latch = new CountDownLatch(resultSize);
responses = Lists.newArrayListWithCapacity(resultSize);
responses = new ArrayList<>(resultSize);
}
@Override
......
......@@ -45,6 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
*/
@Slf4j
public final class MySQLQueryResult implements QueryResult {
@Getter
private final CommandResponsePackets commandResponsePackets;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册