Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer StreamResetException until response body buffer is fully read. #3960

Merged
merged 1 commit into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -300,6 +301,40 @@ private static OkHttpClient buildHttp2Client() {
response2.close();
}

@Test public void connectionWindowUpdateAfterCanceling() throws Exception {
server.enqueue(new MockResponse()
.setBody(new Buffer().write(new byte[Http2Connection.OKHTTP_CLIENT_WINDOW_SIZE + 1])));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

server.enqueue(new MockResponse()
.setBody("abc"));

Call call1 = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response1 = call1.execute();

// Wait until the server has completely filled the stream and connection flow-control windows.
int expectedFrameCount = Http2Connection.OKHTTP_CLIENT_WINDOW_SIZE / 16384;
int dataFrameCount = 0;
while (dataFrameCount < expectedFrameCount) {
String log = http2Handler.take();
if (log.equals("FINE: << 0x00000003 16384 DATA ")) {
dataFrameCount++;
}
}

// Cancel the call and discard what we've buffered for the response body. This should free up
// the connection flow-control window so new requests can proceed.
call1.cancel();
assertFalse("Call should not have completed successfully.",
Util.discard(response1.body().source(), 1, TimeUnit.SECONDS));

Call call2 = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response2 = call2.execute();
assertEquals("abc", response2.body().string());
}

/** https:/square/okhttp/issues/373 */
@Test @Ignore public void synchronousRequest() throws Exception {
server.enqueue(new MockResponse().setBody("A"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public final class Http2Connection implements Closeable {
// operations must synchronize on 'this' last. This ensures that we never
// wait for a blocking operation while holding 'this'.

private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;

/**
* Shared executor to send notifications of incoming streams. This executor requires multiple
Expand Down
61 changes: 35 additions & 26 deletions okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,35 +332,53 @@ private final class FramingSource implements Source {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);

long read;
long read = -1;
ErrorCode errorCode;
synchronized (Http2Stream.this) {
waitUntilReadable();
checkNotClosed();
if (readBuffer.size() == 0) return -1; // This source is exhausted.
if (closed) {
throw new IOException("stream closed");
}
errorCode = Http2Stream.this.errorCode;

// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
if (readBuffer.size() > 0) {
// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
unacknowledgedBytesRead += read;
}

// Flow control: notify the peer that we're ready for more data!
unacknowledgedBytesRead += read;
if (unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
if (errorCode == null
&& unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a WINDOW_UPDATE
// if the stream isn't in error.
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}
}

// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += read;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
if (read != -1) {
// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += read;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
}
}

return read;
}

if (errorCode != null) {
// We defer throwing the exception until now so that we can refill the connection
// flow-control window. This is necessary because we don't transmit window updates until the
// application reads the data. If we throw this prior to updating the connection
// flow-control window, we risk having it go to 0 preventing the server from sending data.
throw new StreamResetException(errorCode);
}

return read;
return -1; // This source is exhausted.
}

/** Returns once the source is either readable or finished. */
Expand Down Expand Up @@ -427,15 +445,6 @@ void receive(BufferedSource in, long byteCount) throws IOException {
}
cancelStreamIfNecessary();
}

private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("stream closed");
}
if (errorCode != null) {
throw new StreamResetException(errorCode);
}
}
}

void cancelStreamIfNecessary() throws IOException {
Expand Down