Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify http channels and exception handling #31379

Merged
merged 3 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,41 @@ public static void close(final Exception ex, final Iterable<? extends Closeable>
}
}

/**
* Closes all given {@link Closeable}s. Some of the {@linkplain Closeable}s may be null; they are
* ignored. After everything is closed, the method either throws the first {@link RuntimeException} it
* hit while closing with other exceptions added as suppressed, or completes normally if there were
* no exceptions. If the first exception is not a {@linkplain RuntimeException}, it is wrapped in a
* {@linkplain RuntimeException}.
*
* @param objects objects to close
*/
public static void closeAndConvertToRuntimeExceptions(final Iterable<? extends Closeable> objects) {
RuntimeException closingException = null;
for (final Closeable object : objects) {
try {
if (object != null) {
object.close();
}
} catch (RuntimeException e) {
if (closingException == null) {
closingException = e;
} else {
closingException.addSuppressed(e);
}
} catch (IOException e) {
if (closingException == null) {
closingException = new RuntimeException(e);
} {
closingException.addSuppressed(e);
}
}
}
if (closingException != null) {
throw closingException;
}
}

/**
* Closes all given {@link Closeable}s, suppressing all thrown exceptions. Some of the {@link Closeable}s may be null, they are ignored.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ protected void doStop() {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
serverChannels.clear();
}
}

Expand All @@ -345,8 +346,7 @@ protected void doClose() {

@Override
public HttpStats stats() {
int serverChannelCount = serverChannels.size();
return new HttpStats(serverChannelCount, httpChannels.size() + serverChannelCount);
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ protected TransportAddress bindAddress(InetAddress hostAddress) {

@Override
public HttpStats stats() {
int serverChannelCount = serverChannels.size();
return new HttpStats(serverChannels.size(), httpChannels.size() + serverChannelCount);
return new HttpStats(serverChannels.size(), totalChannelsAccepted.get());
}

static NioCorsConfig buildCorsConfig(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static <C extends CloseableChannel> void closeChannel(C channel) {
/**
* Closes the channel.
*
* @param channel to close
* @param channel to close
* @param blocking indicates if we should block on channel close
*/
static <C extends CloseableChannel> void closeChannel(C channel, boolean blocking) {
Expand All @@ -83,19 +83,15 @@ static <C extends CloseableChannel> void closeChannel(C channel, boolean blockin
* @param blocking indicates if we should block on channel close
*/
static <C extends CloseableChannel> void closeChannels(List<C> channels, boolean blocking) {
IOUtils.closeAndConvertToRuntimeExceptions(channels);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldnt' it be simpler to do:

try {
  IOUtils.close(channels);
} catch (IOException e) {
  throw new UncheckedIOException(e);
}

???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I’ll pull out the utils part and do that.

if (blocking) {
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
if (channel.isOpen()) {
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
channel.close();
futures.add(closeFuture);
}
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
futures.add(closeFuture);
}
blockOnFutures(futures);
} else {
IOUtils.closeWhileHandlingException(channels);
}
}

Expand All @@ -108,7 +104,6 @@ static void blockOnFutures(List<ActionFuture<Void>> futures) {
// close exceptions happens elsewhere.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.CloseableChannel;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
Expand All @@ -72,6 +74,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final String[] bindHosts;
private final String[] publishHosts;

protected final AtomicLong totalChannelsAccepted = new AtomicLong();
protected final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected volatile BoundTransportAddress boundAddress;

Expand Down Expand Up @@ -181,7 +184,7 @@ protected void onException(HttpChannel channel, Exception e) {
return;
}
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
logger.warn(() -> new ParameterizedMessage(
logger.trace(() -> new ParameterizedMessage(
"close connection exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (NetworkExceptionHelper.isConnectException(e)) {
Expand All @@ -193,7 +196,7 @@ protected void onException(HttpChannel channel, Exception e) {
"cancelled key exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else {
logger.debug(() -> new ParameterizedMessage(
logger.warn(() -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
}
Expand All @@ -210,7 +213,11 @@ protected void onNonChannelException(Exception exception) {
}

protected void serverAcceptedChannel(HttpChannel httpChannel) {
httpChannels.add(httpChannel);
boolean addedOnThisCall = httpChannels.add(httpChannel);
assert addedOnThisCall : "Channel should only be added to http channel set once";
totalChannelsAccepted.incrementAndGet();
httpChannel.addCloseListener(ActionListener.wrap(() -> httpChannels.remove(httpChannel)));
logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/http/HttpStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class HttpStats implements Writeable, ToXContentFragment {
private final long serverOpen;
private final long totalOpen;

public HttpStats(long serverOpen, long totalOpen) {
public HttpStats(long serverOpen, long totalOpened) {
this.serverOpen = serverOpen;
this.totalOpen = totalOpen;
this.totalOpen = totalOpened;
}

public HttpStats(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ protected void onNonChannelException(Exception exception) {

protected void serverAcceptedChannel(TcpChannel channel) {
boolean addedOnThisCall = acceptedChannels.add(channel);
assert addedOnThisCall : "Channel should only be added to accept channel set once";
assert addedOnThisCall : "Channel should only be added to accepted channel set once";
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
}
Expand Down