/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.hongxi.whatsmars.remoting.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import org.hongxi.whatsmars.remoting.ChannelEventListener; import org.hongxi.whatsmars.remoting.InvokeCallback; import org.hongxi.whatsmars.remoting.RPCHook; import org.hongxi.whatsmars.remoting.common.Pair; import org.hongxi.whatsmars.remoting.common.RemotingHelper; import org.hongxi.whatsmars.remoting.common.SemaphoreReleaseOnlyOnce; import org.hongxi.whatsmars.remoting.common.ServiceThread; import org.hongxi.whatsmars.remoting.exception.RemotingSendRequestException; import org.hongxi.whatsmars.remoting.exception.RemotingTimeoutException; import org.hongxi.whatsmars.remoting.exception.RemotingTooMuchRequestException; import org.hongxi.whatsmars.remoting.protocol.RemotingCommand; import org.hongxi.whatsmars.remoting.protocol.RemotingSysResponseCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.SocketAddress; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.*; public abstract class NettyRemotingAbstract { /** * Remoting logger instance. */ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); /** * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint. */ protected final Semaphore semaphoreOneway; /** * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint. */ protected final Semaphore semaphoreAsync; /** * This map caches all on-going requests. */ protected final ConcurrentMap responseTable = new ConcurrentHashMap(256); /** * This container holds all processors per request code, aka, for each incoming request, we may look up the * responding processor in this map to handle the request. */ protected final HashMap> processorTable = new HashMap>(64); /** * Executor to feed netty events to user defined {@link ChannelEventListener}. */ protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); /** * The default request processor to use in case there is no exact match in {@link #processorTable} per request code. */ protected Pair defaultRequestProcessor; /** * SSL context via which to create {@link SslHandler}. */ protected volatile SslContext sslContext; static { NettyLogger.initNettyLogger(); } /** * Constructor, specifying capacity of one-way and asynchronous semaphores. * * @param permitsOneway Number of permits for one-way requests. * @param permitsAsync Number of permits for asynchronous requests. */ public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { this.semaphoreOneway = new Semaphore(permitsOneway, true); this.semaphoreAsync = new Semaphore(permitsAsync, true); } /** * Custom channel event listener. * * @return custom channel event listener if defined; null otherwise. */ public abstract ChannelEventListener getChannelEventListener(); /** * Put a netty event to the executor. * * @param event Netty event instance. */ public void putNettyEvent(final NettyEvent event) { this.nettyEventExecutor.putNettyEvent(event); } /** * Entry of incoming command processing. * *

* Note: * The incoming remoting command may be *

    *
  • An inquiry request from a remote peer component;
  • *
  • A response to a previous request issued by this very participant.
  • *
*

* * @param ctx Channel handler context. * @param msg incoming remoting command. * @throws Exception if there were any error while processing the incoming command. */ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } /** * Process incoming request command issued by remote peer. * * @param ctx channel handler context. * @param cmd request command. */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } } /** * Process response from remote peer to the previous issued requests. * * @param ctx channel handler context. * @param cmd response command instance. */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } } /** * Execute callback in callback executor. If callback executor is null, run directly in current thread */ private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } finally { responseFuture.release(); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } } /** * Custom RPC hook. * * @return RPC hook if specified; null otherwise. */ public abstract RPCHook getRPCHook(); /** * This method specifies thread pool to use while invoking callback methods. * * @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the * netty event-loop thread. */ public abstract ExecutorService getCallbackExecutor(); /** *

* This method is periodically invoked to scan and expire deprecated request. *

*/ public void scanResponseTable() { final List rfList = new LinkedList(); Iterator> it = this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); ResponseFuture rep = next.getValue(); if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); log.warn("remove timeout request, " + rep); } } for (ResponseFuture rf : rfList) { try { executeInvokeCallback(rf); } catch (Throwable e) { log.warn("scanResponseTable, operationComplete Exception", e); } } } public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } } private void requestFail(final int opaque) { ResponseFuture responseFuture = responseTable.remove(opaque); if (responseFuture != null) { responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); try { executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("execute callback in requestFail, and callback throw", e); } finally { responseFuture.release(); } } } /** * mark the request of the specified channel as fail and to invoke fail callback immediately * @param channel the channel which is close already */ protected void failFast(final Channel channel) { Iterator> it = responseTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); if (entry.getValue().getProcessChannel() == channel) { Integer opaque = entry.getKey(); if (opaque != null) { requestFail(opaque); } } } } public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } } class NettyEventExecutor extends ServiceThread { private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue(); private final int maxSize = 10000; public void putNettyEvent(final NettyEvent event) { if (this.eventQueue.size() <= maxSize) { this.eventQueue.add(event); } else { log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); } } @Override public void run() { log.info(this.getServiceName() + " service started"); final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener(); while (!this.isStopped()) { try { NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); if (event != null && listener != null) { switch (event.getType()) { case IDLE: listener.onChannelIdle(event.getRemoteAddr(), event.getChannel()); break; case CLOSE: listener.onChannelClose(event.getRemoteAddr(), event.getChannel()); break; case CONNECT: listener.onChannelConnect(event.getRemoteAddr(), event.getChannel()); break; case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; default: break; } } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return NettyEventExecutor.class.getSimpleName(); } } }