diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index 473985d21091b..981a417449f14 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -31,9 +32,23 @@ public class Netty4HttpChannel implements HttpChannel { private final Channel channel; + private final CompletableContext closeContext = new CompletableContext<>(); Netty4HttpChannel(Channel channel) { this.channel = channel; + this.channel.closeFuture().addListener(f -> { + if (f.isSuccess()) { + closeContext.complete(null); + } else { + Throwable cause = f.cause(); + if (cause instanceof Error) { + Netty4Utils.maybeDie(cause); + closeContext.completeExceptionally(new Exception(cause)); + } else { + closeContext.completeExceptionally((Exception) cause); + } + } + }); } @Override @@ -65,6 +80,16 @@ public InetSocketAddress getRemoteAddress() { return (InetSocketAddress) channel.remoteAddress(); } + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + @Override public void close() { channel.close(); @@ -73,4 +98,12 @@ public void close() { public Channel getNettyChannel() { return channel; } + + @Override + public String toString() { + return "Netty4HttpChannel{" + + "localAddress=" + getLocalAddress() + + ", remoteAddress=" + getRemoteAddress() + + '}'; + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 4547a63a9a278..124bd607ab7ae 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -29,6 +29,8 @@ import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.transport.netty4.Netty4Utils; +import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY; + @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler> { @@ -40,7 +42,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) throws Exception { - Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); + Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); try { @@ -75,7 +77,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest serverChannels = new ArrayList<>(); - // package private for testing - Netty4OpenChannelsHandler serverOpenChannels; - - private final Netty4CorsConfig corsConfig; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, @@ -216,8 +210,6 @@ public Settings settings() { protected void doStart() { boolean success = false; try { - this.serverOpenChannels = new Netty4OpenChannelsHandler(logger); - serverBootstrap = new ServerBootstrap(); serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, @@ -281,10 +273,9 @@ static Netty4CorsConfig buildCorsConfig(Settings settings) { builder.allowCredentials(); } String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ","); - HttpMethod[] methods = Arrays.asList(strMethods) - .stream() + HttpMethod[] methods = Arrays.stream(strMethods) .map(HttpMethod::valueOf) - .toArray(size -> new HttpMethod[size]); + .toArray(HttpMethod[]::new); return builder.allowedRequestMethods(methods) .maxAge(SETTING_CORS_MAX_AGE.get(settings)) .allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ",")) @@ -327,15 +318,21 @@ protected void doStop() { Netty4Utils.closeChannels(serverChannels); } catch (IOException e) { logger.trace("exception while closing channels", e); + } finally { + serverChannels.clear(); } - serverChannels.clear(); } } - if (serverOpenChannels != null) { - serverOpenChannels.close(); - serverOpenChannels = null; + // TODO: Move all of channel closing to abstract class once server channels are handled + try { + CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); + } catch (Exception e) { + logger.warn("unexpected exception while closing http channels", e); } + httpChannels.clear(); + + if (serverBootstrap != null) { serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly(); @@ -349,38 +346,18 @@ protected void doClose() { @Override public HttpStats stats() { - Netty4OpenChannelsHandler channels = serverOpenChannels; - return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels()); + return new HttpStats(httpChannels.size(), totalChannelsAccepted.get()); } - public Netty4CorsConfig getCorsConfig() { - return corsConfig; - } - - protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + @Override + protected void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { if (logger.isTraceEnabled()) { - logger.trace("Read timeout [{}]", ctx.channel().remoteAddress()); + logger.trace("Http read timeout {}", channel); } - ctx.channel().close(); + CloseableChannel.closeChannel(channel);; } else { - if (!lifecycle.started()) { - // ignore - return; - } - if (!NetworkExceptionHelper.isCloseConnectionException(cause)) { - logger.warn( - (Supplier) () -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", ctx.channel()), - cause); - ctx.channel().close(); - } else { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", ctx.channel()), - cause); - ctx.channel().close(); - } + super.onException(channel, cause); } } @@ -404,9 +381,8 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht @Override protected void initChannel(Channel ch) throws Exception { - Netty4HttpChannel nettyTcpChannel = new Netty4HttpChannel(ch); - ch.attr(HTTP_CHANNEL_KEY).set(nettyTcpChannel); - ch.pipeline().addLast("openChannels", transport.serverOpenChannels); + Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch); + ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)); final HttpRequestDecoder decoder = new HttpRequestDecoder( handlingSettings.getMaxInitialLineLength(), @@ -423,10 +399,11 @@ protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); } if (handlingSettings.isCorsEnabled()) { - ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig())); + ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig)); } ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents)); ch.pipeline().addLast("handler", requestHandler); + transport.serverAcceptedChannel(nettyHttpChannel); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4OpenChannelsHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4OpenChannelsHandler.java deleted file mode 100644 index 2270c90967ff2..0000000000000 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4OpenChannelsHandler.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.netty4; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.metrics.CounterMetric; - -import java.io.IOException; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -@ChannelHandler.Sharable -public class Netty4OpenChannelsHandler extends ChannelInboundHandlerAdapter implements Releasable { - - final Set openChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - final CounterMetric openChannelsMetric = new CounterMetric(); - final CounterMetric totalChannelsMetric = new CounterMetric(); - - final Logger logger; - - public Netty4OpenChannelsHandler(Logger logger) { - this.logger = logger; - } - - final ChannelFutureListener remover = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - boolean removed = openChannels.remove(future.channel()); - if (removed) { - openChannelsMetric.dec(); - } - if (logger.isTraceEnabled()) { - logger.trace("channel closed: {}", future.channel()); - } - } - }; - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("channel opened: {}", ctx.channel()); - } - final boolean added = openChannels.add(ctx.channel()); - if (added) { - openChannelsMetric.inc(); - totalChannelsMetric.inc(); - ctx.channel().closeFuture().addListener(remover); - } - - super.channelActive(ctx); - } - - public long numberOfOpenChannels() { - return openChannelsMetric.count(); - } - - public long totalChannels() { - return totalChannelsMetric.count(); - } - - @Override - public void close() { - try { - Netty4Utils.closeChannels(openChannels); - } catch (IOException e) { - logger.trace("exception while closing channels", e); - } - openChannels.clear(); - } - -} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index efa296b6278af..760ac1253c6fe 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -91,7 +92,7 @@ protected void closeConnectionChannel(Transport transport, Transport.Connection final Netty4Transport t = (Netty4Transport) transport; @SuppressWarnings("unchecked") final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } public void testConnectException() throws UnknownHostException { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java index 088f0e85dde23..255faab5ddad0 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java @@ -36,4 +36,17 @@ public class NioHttpChannel extends NioSocketChannel implements HttpChannel { public void sendResponse(HttpResponse response, ActionListener listener) { getContext().sendMessage(response, ActionListener.toBiConsumer(listener)); } + + @Override + public void addCloseListener(ActionListener listener) { + addCloseListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public String toString() { + return "NioHttpChannel{" + + "localAddress=" + getLocalAddress() + + ", remoteAddress=" + getRemoteAddress() + + '}'; + } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index ba51f7c684818..aa0859e6146f2 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -20,22 +20,20 @@ package org.elasticsearch.http.nio; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.timeout.ReadTimeoutException; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -44,6 +42,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.AbstractHttpServerTransport; import org.elasticsearch.http.BindHttpException; +import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.nio.cors.NioCorsConfig; @@ -115,7 +114,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { private final int tcpReceiveBufferSize; private final Set serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Set socketChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private NioGroup nioGroup; private HttpChannelFactory channelFactory; private final NioCorsConfig corsConfig; @@ -156,7 +154,7 @@ protected void doStart() { int workerCount = NIO_HTTP_WORKER_COUNT.get(settings); nioGroup = new NioGroup(daemonThreadFactory(this.settings, HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount, - (s) -> new EventHandler(this::nonChannelExceptionCaught, s)); + (s) -> new EventHandler(this::onNonChannelException, s)); channelFactory = new HttpChannelFactory(); this.boundAddress = createBoundHttpAddress(); @@ -187,12 +185,13 @@ protected void doStop() { } } + // TODO: Move all of channel closing to abstract class once server channels are handled try { - closeChannels(new ArrayList<>(socketChannels)); + CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true); } catch (Exception e) { logger.warn("unexpected exception while closing http channels", e); } - socketChannels.clear(); + httpChannels.clear(); try { nioGroup.close(); @@ -235,38 +234,7 @@ protected TransportAddress bindAddress(InetAddress hostAddress) { @Override public HttpStats stats() { - return new HttpStats(serverChannels.size(), socketChannels.size()); - } - - protected void exceptionCaught(NioSocketChannel channel, Exception cause) { - if (cause instanceof ReadTimeoutException) { - if (logger.isTraceEnabled()) { - logger.trace("Read timeout [{}]", channel.getRemoteAddress()); - } - channel.close(); - } else { - if (lifecycle.started() == false) { - // ignore - return; - } - if (NetworkExceptionHelper.isCloseConnectionException(cause) == false) { - logger.warn( - (Supplier) () -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", channel), - cause); - channel.close(); - } else { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "caught exception while handling client http traffic, closing connection {}", channel), - cause); - channel.close(); - } - } - } - - protected void nonChannelExceptionCaught(Exception ex) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), ex); + return new HttpStats(serverChannels.size(), totalChannelsAccepted.get()); } static NioCorsConfig buildCorsConfig(Settings settings) { @@ -324,7 +292,7 @@ private void closeChannels(List channels) { } private void acceptChannel(NioSocketChannel socketChannel) { - socketChannels.add(socketChannel); + super.serverAcceptedChannel((HttpChannel) socketChannel); } private class HttpChannelFactory extends ChannelFactory { @@ -342,7 +310,7 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) }; HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this, handlingSettings, corsConfig); - Consumer exceptionHandler = (e) -> exceptionCaught(nioChannel, e); + Consumer exceptionHandler = (e) -> onException(nioChannel, e); SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline, new InboundChannelBuffer(pageSupplier)); nioChannel.setContext(context); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java similarity index 92% rename from plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java rename to plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java index ef2bc875aa994..d700ad567bc19 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java @@ -28,11 +28,11 @@ import java.net.StandardSocketOptions; import java.nio.channels.SocketChannel; -public class TcpNioSocketChannel extends NioSocketChannel implements TcpChannel { +public class NioTcpChannel extends NioSocketChannel implements TcpChannel { private final String profile; - public TcpNioSocketChannel(String profile, SocketChannel socketChannel) throws IOException { + public NioTcpChannel(String profile, SocketChannel socketChannel) throws IOException { super(socketChannel); this.profile = profile; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java similarity index 92% rename from plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java rename to plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java index 946563225c66c..10bf4ed752321 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpServerChannel.java @@ -32,11 +32,11 @@ * This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel} * interface. As it is a server socket, setting SO_LINGER and sending messages is not supported. */ -public class TcpNioServerSocketChannel extends NioServerSocketChannel implements TcpChannel { +public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel { private final String profile; - public TcpNioServerSocketChannel(String profile, ServerSocketChannel socketChannel) throws IOException { + public NioTcpServerChannel(String profile, ServerSocketChannel socketChannel) throws IOException { super(socketChannel); this.profile = profile; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index b85d707dcd934..cf7d37493cb38 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -40,7 +40,6 @@ import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; @@ -78,14 +77,14 @@ protected NioTransport(Settings settings, ThreadPool threadPool, NetworkService } @Override - protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { + protected NioTcpServerChannel bind(String name, InetSocketAddress address) throws IOException { TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name); return nioGroup.bindServerChannel(address, channelFactory); } @Override - protected TcpNioSocketChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { - TcpNioSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory); + protected NioTcpChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory); channel.addConnectListener(ActionListener.toBiConsumer(connectListener)); return channel; } @@ -131,19 +130,15 @@ protected void stopInternal() { profileToChannelFactory.clear(); } - protected void exceptionCaught(NioSocketChannel channel, Exception exception) { - onException((TcpChannel) channel, exception); - } - protected void acceptChannel(NioSocketChannel channel) { - serverAcceptedChannel((TcpNioSocketChannel) channel); + serverAcceptedChannel((NioTcpChannel) channel); } protected TcpChannelFactory channelFactory(ProfileSettings settings, boolean isClient) { return new TcpChannelFactoryImpl(settings); } - protected abstract class TcpChannelFactory extends ChannelFactory { + protected abstract class TcpChannelFactory extends ChannelFactory { protected TcpChannelFactory(RawChannelFactory rawChannelFactory) { super(rawChannelFactory); @@ -164,14 +159,14 @@ private TcpChannelFactoryImpl(ProfileSettings profileSettings) { } @Override - public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { - TcpNioSocketChannel nioChannel = new TcpNioSocketChannel(profileName, channel); + public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { + NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel); Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; TcpReadWriteHandler readWriteHandler = new TcpReadWriteHandler(nioChannel, NioTransport.this); - Consumer exceptionHandler = (e) -> exceptionCaught(nioChannel, e); + Consumer exceptionHandler = (e) -> onException(nioChannel, e); BytesChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, readWriteHandler, new InboundChannelBuffer(pageSupplier)); nioChannel.setContext(context); @@ -179,8 +174,8 @@ public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel cha } @Override - public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel); + public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); Consumer exceptionHandler = (e) -> logger.error(() -> new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); Consumer acceptor = NioTransport.this::acceptChannel; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java index f2d07b180855c..e86653b685820 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java @@ -28,10 +28,10 @@ public class TcpReadWriteHandler extends BytesWriteHandler { - private final TcpNioSocketChannel channel; + private final NioTcpChannel channel; private final TcpTransport transport; - public TcpReadWriteHandler(TcpNioSocketChannel channel, TcpTransport transport) { + public TcpReadWriteHandler(NioTcpChannel channel, TcpTransport transport) { this.channel = channel; this.transport = transport; } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index c78ae25e44a06..090fc579c4899 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -96,7 +97,7 @@ protected MockTransportService build(Settings settings, Version version, Cluster protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { @SuppressWarnings("unchecked") TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } public void testConnectException() throws UnknownHostException { diff --git a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java new file mode 100644 index 0000000000000..6b89a90aa2c77 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.network; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public interface CloseableChannel extends Closeable { + + /** + * Closes the channel. For most implementations, this will be be an asynchronous process. For this + * reason, this method does not throw {@link java.io.IOException} There is no guarantee that the channel + * will be closed when this method returns. Use the {@link #addCloseListener(ActionListener)} method + * to implement logic that depends on knowing when the channel is closed. + */ + @Override + void close(); + + /** + * Adds a listener that will be executed when the channel is closed. If the channel is still open when + * this listener is added, the listener will be executed by the thread that eventually closes the + * channel. If the channel is already closed when the listener is added the listener will immediately be + * executed by the thread that is attempting to add the listener. + * + * @param listener to be executed + */ + void addCloseListener(ActionListener listener); + + /** + * Indicates whether a channel is currently open + * + * @return boolean indicating if channel is open + */ + boolean isOpen(); + + /** + * Closes the channel without blocking. + * + * @param channel to close + */ + static void closeChannel(C channel) { + closeChannel(channel, false); + } + + /** + * Closes the channel. + * + * @param channel to close + * @param blocking indicates if we should block on channel close + */ + static void closeChannel(C channel, boolean blocking) { + closeChannels(Collections.singletonList(channel), blocking); + } + + /** + * Closes the channels. + * + * @param channels to close + * @param blocking indicates if we should block on channel close + */ + static void closeChannels(List channels, boolean blocking) { + try { + IOUtils.close(channels); + } catch (IOException e) { + // The CloseableChannel#close method does not throw IOException, so this should not occur. + throw new UncheckedIOException(e); + } + if (blocking) { + ArrayList> futures = new ArrayList<>(channels.size()); + for (final C channel : channels) { + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + channel.addCloseListener(closeFuture); + futures.add(closeFuture); + } + blockOnFutures(futures); + } + } + + static void blockOnFutures(List> futures) { + for (ActionFuture future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + // Ignore as we are only interested in waiting for the close process to complete. Logging + // close exceptions happens elsewhere. + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 4fad4159f55d8..9d9008f7fb879 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -21,12 +21,16 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.transport.PortsRange; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; @@ -41,9 +45,14 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.channels.CancelledKeyException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; @@ -60,11 +69,13 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo protected final Dispatcher dispatcher; private final NamedXContentRegistry xContentRegistry; - protected final String[] bindHosts; - protected final String[] publishHosts; protected final PortsRange port; protected final ByteSizeValue maxContentLength; + private final String[] bindHosts; + private final String[] publishHosts; + protected final AtomicLong totalChannelsAccepted = new AtomicLong(); + protected final Set httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); protected volatile BoundTransportAddress boundAddress; protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, @@ -166,6 +177,49 @@ static int resolvePublishPort(Settings settings, List boundAdd return publishPort; } + protected void onException(HttpChannel channel, Exception e) { + if (lifecycle.started() == false) { + // just close and ignore - we are already stopped and just need to make sure we release all resources + CloseableChannel.closeChannel(channel); + return; + } + if (NetworkExceptionHelper.isCloseConnectionException(e)) { + logger.trace(() -> new ParameterizedMessage( + "close connection exception caught while handling client http traffic, closing connection {}", channel), e); + CloseableChannel.closeChannel(channel); + } else if (NetworkExceptionHelper.isConnectException(e)) { + logger.trace(() -> new ParameterizedMessage( + "connect exception caught while handling client http traffic, closing connection {}", channel), e); + CloseableChannel.closeChannel(channel); + } else if (e instanceof CancelledKeyException) { + logger.trace(() -> new ParameterizedMessage( + "cancelled key exception caught while handling client http traffic, closing connection {}", channel), e); + CloseableChannel.closeChannel(channel); + } else { + logger.warn(() -> new ParameterizedMessage( + "caught exception while handling client http traffic, closing connection {}", channel), e); + CloseableChannel.closeChannel(channel); + } + } + + /** + * Exception handler for exceptions that are not associated with a specific channel. + * + * @param exception the exception + */ + protected void onNonChannelException(Exception exception) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), + exception); + } + + protected void serverAcceptedChannel(HttpChannel httpChannel) { + boolean addedOnThisCall = httpChannels.add(httpChannel); + assert addedOnThisCall : "Channel should only be added to http channel set once"; + totalChannelsAccepted.incrementAndGet(); + httpChannel.addCloseListener(ActionListener.wrap(() -> httpChannels.remove(httpChannel))); + logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel)); + } + /** * This method handles an incoming http request. * @@ -181,7 +235,7 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt * * @param httpRequest that is incoming * @param httpChannel that received the http request - * @param exception that was encountered + * @param exception that was encountered */ public void incomingRequestError(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) { handleIncomingRequest(httpRequest, httpChannel, exception); @@ -219,7 +273,7 @@ private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChan innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause); } catch (final RestRequest.BadParameterException e) { badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e); - innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel); + innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel); } restRequest = innerRestRequest; } diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index f5924bb239eae..38bf1e751ef9d 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.rest.AbstractRestChannel; @@ -114,7 +115,7 @@ public void sendResponse(RestResponse restResponse) { } if (isCloseConnection()) { - toClose.add(httpChannel); + toClose.add(() -> CloseableChannel.closeChannel(httpChannel)); } ActionListener listener = ActionListener.wrap(() -> Releasables.close(toClose)); diff --git a/server/src/main/java/org/elasticsearch/http/HttpChannel.java b/server/src/main/java/org/elasticsearch/http/HttpChannel.java index baea3e0c3b3c3..ea8d3c276b16d 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpChannel.java +++ b/server/src/main/java/org/elasticsearch/http/HttpChannel.java @@ -20,11 +20,11 @@ package org.elasticsearch.http; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.network.CloseableChannel; import java.net.InetSocketAddress; -public interface HttpChannel extends Releasable { +public interface HttpChannel extends CloseableChannel { /** * Sends a http response to the channel. The listener will be executed once the send process has been @@ -49,10 +49,4 @@ public interface HttpChannel extends Releasable { */ InetSocketAddress getRemoteAddress(); - /** - * Closes the channel. This might be an asynchronous process. There is no guarantee that the channel - * will be closed when this method returns. - */ - void close(); - } diff --git a/server/src/main/java/org/elasticsearch/http/HttpStats.java b/server/src/main/java/org/elasticsearch/http/HttpStats.java index ac7f0d69485fe..4809315ce1810 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpStats.java +++ b/server/src/main/java/org/elasticsearch/http/HttpStats.java @@ -32,9 +32,9 @@ public class HttpStats implements Writeable, ToXContentFragment { private final long serverOpen; private final long totalOpen; - public HttpStats(long serverOpen, long totalOpen) { + public HttpStats(long serverOpen, long totalOpened) { this.serverOpen = serverOpen; - this.totalOpen = totalOpen; + this.totalOpen = totalOpened; } public HttpStats(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index 1a022ee9f4856..bc5cc2c92f2cb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -21,17 +21,13 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -43,30 +39,13 @@ * abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport * implementations must return channels that adhere to the required method contracts. */ -public interface TcpChannel extends Releasable { - - /** - * Closes the channel. This might be an asynchronous process. There is no guarantee that the channel - * will be closed when this method returns. Use the {@link #addCloseListener(ActionListener)} method - * to implement logic that depends on knowing when the channel is closed. - */ - void close(); +public interface TcpChannel extends CloseableChannel { /** * This returns the profile for this channel. */ String getProfile(); - /** - * Adds a listener that will be executed when the channel is closed. If the channel is still open when - * this listener is added, the listener will be executed by the thread that eventually closes the - * channel. If the channel is already closed when the listener is added the listener will immediately be - * executed by the thread that is attempting to add the listener. - * - * @param listener to be executed - */ - void addCloseListener(ActionListener listener); - /** * This sets the low level socket option {@link java.net.StandardSocketOptions} SO_LINGER on a channel. @@ -77,13 +56,6 @@ public interface TcpChannel extends Releasable { void setSoLinger(int value) throws IOException; - /** - * Indicates whether a channel is currently open - * - * @return boolean indicating if channel is open - */ - boolean isOpen(); - /** * Returns the local address for this channel. * @@ -107,48 +79,6 @@ public interface TcpChannel extends Releasable { */ void sendMessage(BytesReference reference, ActionListener listener); - /** - * Closes the channel without blocking. - * - * @param channel to close - */ - static void closeChannel(C channel) { - closeChannel(channel, false); - } - - /** - * Closes the channel. - * - * @param channel to close - * @param blocking indicates if we should block on channel close - */ - static void closeChannel(C channel, boolean blocking) { - closeChannels(Collections.singletonList(channel), blocking); - } - - /** - * Closes the channels. - * - * @param channels to close - * @param blocking indicates if we should block on channel close - */ - static void closeChannels(List channels, boolean blocking) { - if (blocking) { - ArrayList> futures = new ArrayList<>(channels.size()); - for (final C channel : channels) { - if (channel.isOpen()) { - PlainActionFuture closeFuture = PlainActionFuture.newFuture(); - channel.addCloseListener(closeFuture); - channel.close(); - futures.add(closeFuture); - } - } - blockOnFutures(futures); - } else { - Releasables.close(channels); - } - } - /** * Awaits for all of the pending connections to complete. Will throw an exception if at least one of the * connections fails. @@ -188,17 +118,4 @@ static void awaitConnected(DiscoveryNode discoveryNode, List> } } - static void blockOnFutures(List> futures) { - for (ActionFuture future : futures) { - try { - future.get(); - } catch (ExecutionException e) { - // Ignore as we are only interested in waiting for the close process to complete. Logging - // close exceptions happens elsewhere. - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Future got interrupted", e); - } - } - } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index c577fae486744..bd862c19e9c6d 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.IntSet; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -479,7 +480,7 @@ public void close() { } boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false; - TcpChannel.closeChannels(channels, block); + CloseableChannel.closeChannels(channels, block); } finally { transportService.onConnectionClosed(this); } @@ -623,7 +624,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c channels.add(channel); } catch (Exception e) { // If there was an exception when attempting to instantiate the raw channels, we close all of the channels - TcpChannel.closeChannels(channels, false); + CloseableChannel.closeChannels(channels, false); throw e; } } @@ -632,7 +633,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c try { TcpChannel.awaitConnected(node, connectionFutures, connectionProfile.getConnectTimeout()); } catch (Exception ex) { - TcpChannel.closeChannels(channels, false); + CloseableChannel.closeChannels(channels, false); throw ex; } @@ -643,7 +644,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c try { version = executeHandshake(node, handshakeChannel, connectionProfile.getHandshakeTimeout()); } catch (Exception ex) { - TcpChannel.closeChannels(channels, false); + CloseableChannel.closeChannels(channels, false); throw ex; } @@ -962,12 +963,12 @@ protected final void doStop() { ActionListener closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); channels.forEach(c -> c.addCloseListener(closeFailLogger)); - TcpChannel.closeChannels(channels, true); + CloseableChannel.closeChannels(channels, true); } serverChannels.clear(); // close all of the incoming channels. The closeChannels method takes a list so we must convert the set. - TcpChannel.closeChannels(new ArrayList<>(acceptedChannels), true); + CloseableChannel.closeChannels(new ArrayList<>(acceptedChannels), true); acceptedChannels.clear(); @@ -1001,7 +1002,7 @@ protected final void doStop() { protected void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); return; } @@ -1009,20 +1010,20 @@ protected void onException(TcpChannel channel, Exception e) { logger.trace(() -> new ParameterizedMessage( "close connection exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (isConnectException(e)) { logger.trace(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (e instanceof BindException) { logger.trace(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (e instanceof CancelledKeyException) { logger.trace(() -> new ParameterizedMessage( "cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (e instanceof TcpTransport.HttpOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { @@ -1030,13 +1031,13 @@ protected void onException(TcpChannel channel, Exception e) { final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override protected void innerInnerOnResponse(Void v) { - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } @Override protected void innerOnFailure(Exception e) { logger.debug("failed to send message to httpOnTransport channel", e); - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } }; internalSendMessage(channel, message, closeChannel); @@ -1044,7 +1045,7 @@ protected void innerOnFailure(Exception e) { } else { logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); // close the channel, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } } @@ -1060,7 +1061,7 @@ protected void onNonChannelException(Exception exception) { protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); - assert addedOnThisCall : "Channel should only be added to accept channel set once"; + assert addedOnThisCall : "Channel should only be added to accepted channel set once"; channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel))); logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 4d4743156c73d..2aec495390b6c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -143,6 +143,16 @@ public InetSocketAddress getRemoteAddress() { return remoteAddress; } + @Override + public void addCloseListener(ActionListener listener) { + + } + + @Override + public boolean isOpen() { + return true; + } + @Override public void close() { diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index e9f5f86462f54..84c82f4159dc6 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -65,7 +66,7 @@ protected void closeConnectionChannel(Transport transport, Transport.Connection final MockTcpTransport t = (MockTcpTransport) transport; @SuppressWarnings("unchecked") final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index bd7fddf82b858..cf9eb5d7a8c57 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -97,7 +98,7 @@ protected MockTransportService build(Settings settings, Version version, Cluster protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { @SuppressWarnings("unchecked") TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } public void testConnectException() throws UnknownHostException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index d897d55e5fdc4..ce06712722cd1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -13,6 +13,7 @@ import io.netty.handler.ssl.SslHandler; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -111,7 +112,7 @@ protected ChannelHandler getClientChannelInitializer() { protected void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (SSLExceptionHelper.isNotSslRecordException(e)) { if (logger.isTraceEnabled()) { logger.trace( @@ -119,21 +120,21 @@ protected void onException(TcpChannel channel, Exception e) { } else { logger.warn("received plaintext traffic on an encrypted channel, closing connection {}", channel); } - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (SSLExceptionHelper.isCloseDuringHandshakeException(e)) { if (logger.isTraceEnabled()) { logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", channel), e); } else { logger.warn("connection {} closed during handshake", channel); } - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else if (SSLExceptionHelper.isReceivedCertificateUnknownException(e)) { if (logger.isTraceEnabled()) { logger.trace(new ParameterizedMessage("client did not trust server's certificate, closing connection {}", channel), e); } else { logger.warn("client did not trust this server's certificate, closing connection {}", channel); } - TcpChannel.closeChannel(channel); + CloseableChannel.closeChannel(channel); } else { super.onException(channel, e); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/http/netty4/Netty4HttpMockUtil.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/http/netty4/Netty4HttpMockUtil.java deleted file mode 100644 index 87e3e78cbc4a3..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/http/netty4/Netty4HttpMockUtil.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.http.netty4; - -import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler; - -import static org.mockito.Mockito.mock; - -/** Allows setting a mock into Netty3HttpServerTransport */ -public class Netty4HttpMockUtil { - - /** - * We don't really need to start Netty for these tests, but we can't create a pipeline - * with a null handler. So we set it to a mock for tests. - */ - public static void setOpenChannelsHandlerToMock(Netty4HttpServerTransport transport) throws Exception { - transport.serverOpenChannels = mock(Netty4OpenChannelsHandler.class); - } - -} \ No newline at end of file diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java index ac586c4945794..9667ca675b4c1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransport.java @@ -7,16 +7,16 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.handler.ssl.SslHandler; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.netty4.Netty4HttpServerTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.security.transport.filter.IPFilter; @@ -57,37 +57,36 @@ public SecurityNetty4HttpServerTransport(Settings settings, NetworkService netwo } @Override - protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Netty4Utils.maybeDie(cause); + protected void onException(HttpChannel channel, Exception e) { if (!lifecycle.started()) { return; } - if (isNotSslRecordException(cause)) { + if (isNotSslRecordException(e)) { if (logger.isTraceEnabled()) { logger.trace(new ParameterizedMessage("received plaintext http traffic on a https channel, closing connection {}", - ctx.channel()), cause); + channel), e); } else { - logger.warn("received plaintext http traffic on a https channel, closing connection {}", ctx.channel()); + logger.warn("received plaintext http traffic on a https channel, closing connection {}", channel); } - ctx.channel().close(); - } else if (isCloseDuringHandshakeException(cause)) { + CloseableChannel.closeChannel(channel); + } else if (isCloseDuringHandshakeException(e)) { if (logger.isTraceEnabled()) { - logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", ctx.channel()), cause); + logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", channel), e); } else { - logger.warn("connection {} closed during ssl handshake", ctx.channel()); + logger.warn("connection {} closed during ssl handshake", channel); } - ctx.channel().close(); - } else if (isReceivedCertificateUnknownException(cause)) { + CloseableChannel.closeChannel(channel); + } else if (isReceivedCertificateUnknownException(e)) { if (logger.isTraceEnabled()) { logger.trace(new ParameterizedMessage("http client did not trust server's certificate, closing connection {}", - ctx.channel()), cause); + channel), e); } else { - logger.warn("http client did not trust this server's certificate, closing connection {}", ctx.channel()); + logger.warn("http client did not trust this server's certificate, closing connection {}", channel); } - ctx.channel().close(); + CloseableChannel.closeChannel(channel); } else { - super.exceptionCaught(ctx, cause); + super.onException(channel, e); } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 39ce1a0150c4f..5315a944f778d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -14,14 +14,14 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; -import org.elasticsearch.nio.NioSelector; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.nio.NioTcpChannel; +import org.elasticsearch.transport.nio.NioTcpServerChannel; import org.elasticsearch.transport.nio.NioTransport; -import org.elasticsearch.transport.nio.TcpNioServerSocketChannel; -import org.elasticsearch.transport.nio.TcpNioSocketChannel; import org.elasticsearch.transport.nio.TcpReadWriteHandler; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; @@ -95,11 +95,6 @@ protected void acceptChannel(NioSocketChannel channel) { super.acceptChannel(channel); } - @Override - protected void exceptionCaught(NioSocketChannel channel, Exception exception) { - super.exceptionCaught(channel, exception); - } - private class SecurityTcpChannelFactory extends TcpChannelFactory { private final String profileName; @@ -116,11 +111,11 @@ private SecurityTcpChannelFactory(ProfileSettings profileSettings, boolean isCli } @Override - public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { + public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE); SSLEngine sslEngine = sslService.createSSLEngine(profileConfiguration.getOrDefault(profileName, defaultConfig), null, -1); SSLDriver sslDriver = new SSLDriver(sslEngine, isClient); - TcpNioSocketChannel nioChannel = new TcpNioSocketChannel(profileName, channel); + NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel); Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); @@ -128,15 +123,15 @@ public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel cha TcpReadWriteHandler readWriteHandler = new TcpReadWriteHandler(nioChannel, SecurityNioTransport.this); InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier); - Consumer exceptionHandler = (e) -> exceptionCaught(nioChannel, e); + Consumer exceptionHandler = (e) -> onException(nioChannel, e); SSLChannelContext context = new SSLChannelContext(nioChannel, selector, exceptionHandler, sslDriver, readWriteHandler, buffer); nioChannel.setContext(context); return nioChannel; } @Override - public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { - TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel); + public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException { + NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel); Consumer exceptionHandler = (e) -> logger.error(() -> new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); Consumer acceptor = SecurityNioTransport.this::acceptChannel; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java index 3ef298f3f232d..ec925f43abe79 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.NullDispatcher; -import org.elasticsearch.http.netty4.Netty4HttpMockUtil; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackSettings; @@ -26,7 +25,6 @@ import org.junit.Before; import javax.net.ssl.SSLEngine; - import java.nio.file.Path; import java.util.Collections; import java.util.Locale; @@ -65,7 +63,6 @@ public void testDefaultClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); - Netty4HttpMockUtil.setOpenChannelsHandlerToMock(transport); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -82,7 +79,6 @@ public void testOptionalClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); - Netty4HttpMockUtil.setOpenChannelsHandlerToMock(transport); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -99,7 +95,6 @@ public void testRequiredClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); - Netty4HttpMockUtil.setOpenChannelsHandlerToMock(transport); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(true)); @@ -116,7 +111,6 @@ public void testNoClientAuth() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); - Netty4HttpMockUtil.setOpenChannelsHandlerToMock(transport); ChannelHandler handler = transport.configureServerChannelHandler(); final EmbeddedChannel ch = new EmbeddedChannel(handler); assertThat(ch.pipeline().get(SslHandler.class).engine().getNeedClientAuth(), is(false)); @@ -131,7 +125,6 @@ public void testCustomSSLConfiguration() throws Exception { SecurityNetty4HttpServerTransport transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); - Netty4HttpMockUtil.setOpenChannelsHandlerToMock(transport); ChannelHandler handler = transport.configureServerChannelHandler(); EmbeddedChannel ch = new EmbeddedChannel(handler); SSLEngine defaultEngine = ch.pipeline().get(SslHandler.class).engine(); @@ -144,7 +137,6 @@ public void testCustomSSLConfiguration() throws Exception { sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); transport = new SecurityNetty4HttpServerTransport(settings, new NetworkService(Collections.emptyList()), mock(BigArrays.class), mock(IPFilter.class), sslService, mock(ThreadPool.class), xContentRegistry(), new NullDispatcher()); - Netty4HttpMockUtil.setOpenChannelsHandlerToMock(transport); handler = transport.configureServerChannelHandler(); ch = new EmbeddedChannel(handler); SSLEngine customEngine = ch.pipeline().get(SslHandler.class).engine(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 0a7ee13b9e296..c5a6a525d4e10 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; @@ -118,7 +119,7 @@ protected MockTransportService build(Settings settings, Version version, Cluster protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { @SuppressWarnings("unchecked") TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } public void testConnectException() throws UnknownHostException {