/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; /** * remoting netty server */ public class NettyRemotingServer { private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); /** * server bootstrap */ private final ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * encoder */ private final NettyEncoder encoder = new NettyEncoder(); /** * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); /** * boss group */ private final EventLoopGroup bossGroup; /** * worker group */ private final EventLoopGroup workGroup; /** * server config */ private final NettyServerConfig serverConfig; /** * server handler */ private final NettyServerHandler serverHandler = new NettyServerHandler(this); /** * started flag */ private final AtomicBoolean isStarted = new AtomicBoolean(false); /** * server init * * @param serverConfig server config */ public NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; if (NettyUtils.useEpoll()) { this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); } }); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); } }); } else { this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); } }); this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); } }); } } /** * server start */ public void start() { if (isStarted.compareAndSet(false, true)) { this.serverBootstrap .group(this.bossGroup, this.workGroup) .channel(NettyUtils.getServerSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { initNettyChannel(ch); } }); ChannelFuture future; try { future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); } catch (Exception e) { logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); } if (future.isSuccess()) { logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); } else if (future.cause() != null) { throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); } else { throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); } } } /** * init netty channel * * @param ch socket channel */ private void initNettyChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", encoder); pipeline.addLast("decoder", new NettyDecoder()); pipeline.addLast("handler", serverHandler); } /** * register processor * * @param commandType command type * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** * register processor * * @param commandType command type * @param processor processor * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.serverHandler.registerProcessor(commandType, processor, executor); } /** * get default thread executor * * @return thread executor */ public ExecutorService getDefaultExecutor() { return defaultExecutor; } public void close() { if (isStarted.compareAndSet(true, false)) { try { if (bossGroup != null) { this.bossGroup.shutdownGracefully(); } if (workGroup != null) { this.workGroup.shutdownGracefully(); } defaultExecutor.shutdown(); } catch (Exception ex) { logger.error("netty server close exception", ex); } logger.info("netty server closed"); } } }