提交 f51b1c8e 编写于 作者: T terrymanu

ShardingProxyClient => BackendNettyClient

上级 4cc45d3f
...@@ -34,26 +34,30 @@ import io.netty.channel.pool.SimpleChannelPool; ...@@ -34,26 +34,30 @@ import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.shardingsphere.core.rule.DataSourceParameter; import io.shardingsphere.core.rule.DataSourceParameter;
import io.shardingsphere.proxy.config.RuleRegistry; import io.shardingsphere.proxy.config.RuleRegistry;
import lombok.AccessLevel;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/** /**
* Sharding-Proxy Client. * Backend connection client for netty.
* *
* @author wangkai * @author wangkai
* @author linjiaqi * @author linjiaqi
*/ */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j @Slf4j
public final class ShardingProxyClient { public final class BackendNettyClient {
private static final ShardingProxyClient INSTANCE = new ShardingProxyClient(); private static final BackendNettyClient INSTANCE = new BackendNettyClient();
private static final RuleRegistry RULE_REGISTRY = RuleRegistry.getInstance(); private static final RuleRegistry RULE_REGISTRY = RuleRegistry.getInstance();
...@@ -71,23 +75,28 @@ public final class ShardingProxyClient { ...@@ -71,23 +75,28 @@ public final class ShardingProxyClient {
private ChannelPoolMap<String, SimpleChannelPool> poolMap; private ChannelPoolMap<String, SimpleChannelPool> poolMap;
/** /**
* Start Sharding-Proxy. * Get instance of backend connection client for netty.
* *
* @throws MalformedURLException url is illegal. * @return instance of backend connection client for netty
*/
public static BackendNettyClient getInstance() {
return INSTANCE;
}
/**
* Start backend connection client for netty.
*
* @throws MalformedURLException URL is illegal
* @throws InterruptedException interrupted exception * @throws InterruptedException interrupted exception
*/ */
public void start() throws MalformedURLException, InterruptedException { public void start() throws MalformedURLException, InterruptedException {
Map<String, DataSourceParameter> dataSourceConfigurationMap = RULE_REGISTRY.getDataSourceConfigurationMap(); Map<String, DataSourceParameter> dataSourceConfigurationMap = RULE_REGISTRY.getDataSourceConfigurationMap();
for (Map.Entry<String, DataSourceParameter> each : dataSourceConfigurationMap.entrySet()) { for (Entry<String, DataSourceParameter> each : dataSourceConfigurationMap.entrySet()) {
URL url = new URL(each.getValue().getUrl().replaceAll("jdbc:mysql:", "http:")); URL url = new URL(each.getValue().getUrl().replaceAll("jdbc:mysql:", "http:"));
final String ip = url.getHost(); dataSourceConfigMap.put(each.getKey(), new DataSourceConfig(url.getHost(), url.getPort(), url.getPath().substring(1), each.getValue().getUsername(), each.getValue().getPassword()));
final int port = url.getPort();
final String database = url.getPath().substring(1);
final String username = (each.getValue()).getUsername();
final String password = (each.getValue()).getPassword();
dataSourceConfigMap.put(each.getKey(), new DataSourceConfig(ip, port, database, username, password));
} }
final Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
// TODO :jiaqi where to init workerGroup?
if (workerGroup instanceof EpollEventLoopGroup) { if (workerGroup instanceof EpollEventLoopGroup) {
groupsEpoll(bootstrap); groupsEpoll(bootstrap);
} else { } else {
...@@ -97,10 +106,10 @@ public final class ShardingProxyClient { ...@@ -97,10 +106,10 @@ public final class ShardingProxyClient {
} }
/** /**
* Stop Sharding-Proxy. * Stop backend connection client for netty.
*/ */
public void stop() { public void stop() {
if (workerGroup != null) { if (null != workerGroup) {
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
} }
} }
...@@ -149,13 +158,4 @@ public final class ShardingProxyClient { ...@@ -149,13 +158,4 @@ public final class ShardingProxyClient {
} }
} }
} }
/**
* Get instance of sharding-proxy client.
*
* @return instance of sharding-proxy client
*/
public static ShardingProxyClient getInstance() {
return INSTANCE;
}
} }
...@@ -25,13 +25,13 @@ import io.shardingsphere.proxy.transport.common.codec.PacketCodecFactory; ...@@ -25,13 +25,13 @@ import io.shardingsphere.proxy.transport.common.codec.PacketCodecFactory;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
/** /**
* Channel initializer. * Channel initializer for backend connection netty client.
* *
* @author wangkai * @author wangkai
* @author linjiaqi * @author linjiaqi
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
public final class ClientHandlerInitializer extends ChannelInitializer<Channel> { public final class BackendNettyClientChannelInitializer extends ChannelInitializer<Channel> {
private final DataSourceConfig dataSourceConfig; private final DataSourceConfig dataSourceConfig;
......
...@@ -148,7 +148,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler { ...@@ -148,7 +148,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
if (!channelMap.containsKey(dataSourceName)) { if (!channelMap.containsKey(dataSourceName)) {
channelMap.put(dataSourceName, new ArrayList<Channel>()); channelMap.put(dataSourceName, new ArrayList<Channel>());
} }
SimpleChannelPool pool = ShardingProxyClient.getInstance().getPoolMap().get(dataSourceName); SimpleChannelPool pool = BackendNettyClient.getInstance().getPoolMap().get(dataSourceName);
Channel channel = pool.acquire().get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS); Channel channel = pool.acquire().get(RULE_REGISTRY.getBackendNIOConfig().getConnectionTimeoutSeconds(), TimeUnit.SECONDS);
channelMap.get(dataSourceName).add(channel); channelMap.get(dataSourceName).add(channel);
ChannelRegistry.getInstance().putConnectionId(channel.id().asShortText(), connectionId); ChannelRegistry.getInstance().putConnectionId(channel.id().asShortText(), connectionId);
...@@ -208,7 +208,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler { ...@@ -208,7 +208,7 @@ public final class NettyBackendHandler extends AbstractBackendHandler {
if (null == mergedResult || !mergedResult.next()) { if (null == mergedResult || !mergedResult.next()) {
for (Entry<String, List<Channel>> entry : channelMap.entrySet()) { for (Entry<String, List<Channel>> entry : channelMap.entrySet()) {
for (Channel each : entry.getValue()) { for (Channel each : entry.getValue()) {
ShardingProxyClient.getInstance().getPoolMap().get(entry.getKey()).release(each); BackendNettyClient.getInstance().getPoolMap().get(entry.getKey()).release(each);
} }
} }
return false; return false;
......
...@@ -46,6 +46,6 @@ public class NettyChannelPoolHandler implements ChannelPoolHandler { ...@@ -46,6 +46,6 @@ public class NettyChannelPoolHandler implements ChannelPoolHandler {
@Override @Override
public void channelCreated(final Channel channel) { public void channelCreated(final Channel channel) {
log.info("channelCreated. Channel ID: {}" + channel.id().asShortText()); log.info("channelCreated. Channel ID: {}" + channel.id().asShortText());
channel.pipeline().addLast(new ClientHandlerInitializer(dataSourceConfig)); channel.pipeline().addLast(new BackendNettyClientChannelInitializer(dataSourceConfig));
} }
} }
...@@ -30,7 +30,7 @@ import io.netty.channel.nio.NioEventLoopGroup; ...@@ -30,7 +30,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import io.shardingsphere.proxy.backend.netty.ShardingProxyClient; import io.shardingsphere.proxy.backend.netty.BackendNettyClient;
import io.shardingsphere.proxy.config.RuleRegistry; import io.shardingsphere.proxy.config.RuleRegistry;
import io.shardingsphere.proxy.frontend.common.netty.ServerHandlerInitializer; import io.shardingsphere.proxy.frontend.common.netty.ServerHandlerInitializer;
import io.shardingsphere.proxy.backend.BackendExecutorContext; import io.shardingsphere.proxy.backend.BackendExecutorContext;
...@@ -70,7 +70,7 @@ public final class ShardingProxy { ...@@ -70,7 +70,7 @@ public final class ShardingProxy {
public void start(final int port) throws InterruptedException, MalformedURLException { public void start(final int port) throws InterruptedException, MalformedURLException {
try { try {
if (RULE_REGISTRY.getBackendNIOConfig().isUseNIO()) { if (RULE_REGISTRY.getBackendNIOConfig().isUseNIO()) {
ShardingProxyClient.getInstance().start(); BackendNettyClient.getInstance().start();
} }
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
bossGroup = createEventLoopGroup(); bossGroup = createEventLoopGroup();
...@@ -87,7 +87,7 @@ public final class ShardingProxy { ...@@ -87,7 +87,7 @@ public final class ShardingProxy {
frontendExecutorContext.getExecutorService().shutdown(); frontendExecutorContext.getExecutorService().shutdown();
backendExecutorContext.getExecutorService().shutdown(); backendExecutorContext.getExecutorService().shutdown();
if (RULE_REGISTRY.getBackendNIOConfig().isUseNIO()) { if (RULE_REGISTRY.getBackendNIOConfig().isUseNIO()) {
ShardingProxyClient.getInstance().stop(); BackendNettyClient.getInstance().stop();
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册