Skip to content

Commit

Permalink
Merge pull request #3960 from square/http2-connection-flow-control
Browse files Browse the repository at this point in the history
Defer StreamResetException until response body buffer is fully read.
  • Loading branch information
swankjesse authored Apr 12, 2018
2 parents b35ad1b + 691a82b commit e593e2e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -300,6 +301,40 @@ private static OkHttpClient buildHttp2Client() {
response2.close();
}

@Test public void connectionWindowUpdateAfterCanceling() throws Exception {
server.enqueue(new MockResponse()
.setBody(new Buffer().write(new byte[Http2Connection.OKHTTP_CLIENT_WINDOW_SIZE + 1])));
server.enqueue(new MockResponse()
.setBody("abc"));

Call call1 = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response1 = call1.execute();

// Wait until the server has completely filled the stream and connection flow-control windows.
int expectedFrameCount = Http2Connection.OKHTTP_CLIENT_WINDOW_SIZE / 16384;
int dataFrameCount = 0;
while (dataFrameCount < expectedFrameCount) {
String log = http2Handler.take();
if (log.equals("FINE: << 0x00000003 16384 DATA ")) {
dataFrameCount++;
}
}

// Cancel the call and discard what we've buffered for the response body. This should free up
// the connection flow-control window so new requests can proceed.
call1.cancel();
assertFalse("Call should not have completed successfully.",
Util.discard(response1.body().source(), 1, TimeUnit.SECONDS));

Call call2 = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response2 = call2.execute();
assertEquals("abc", response2.body().string());
}

/** https:/square/okhttp/issues/373 */
@Test @Ignore public void synchronousRequest() throws Exception {
server.enqueue(new MockResponse().setBody("A"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public final class Http2Connection implements Closeable {
// operations must synchronize on 'this' last. This ensures that we never
// wait for a blocking operation while holding 'this'.

private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;

/**
* Shared executor to send notifications of incoming streams. This executor requires multiple
Expand Down
61 changes: 35 additions & 26 deletions okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,35 +332,53 @@ private final class FramingSource implements Source {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);

long read;
long read = -1;
ErrorCode errorCode;
synchronized (Http2Stream.this) {
waitUntilReadable();
checkNotClosed();
if (readBuffer.size() == 0) return -1; // This source is exhausted.
if (closed) {
throw new IOException("stream closed");
}
errorCode = Http2Stream.this.errorCode;

// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
if (readBuffer.size() > 0) {
// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
unacknowledgedBytesRead += read;
}

// Flow control: notify the peer that we're ready for more data!
unacknowledgedBytesRead += read;
if (unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
if (errorCode == null
&& unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a WINDOW_UPDATE
// if the stream isn't in error.
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}
}

// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += read;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
if (read != -1) {
// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += read;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
}
}

return read;
}

if (errorCode != null) {
// We defer throwing the exception until now so that we can refill the connection
// flow-control window. This is necessary because we don't transmit window updates until the
// application reads the data. If we throw this prior to updating the connection
// flow-control window, we risk having it go to 0 preventing the server from sending data.
throw new StreamResetException(errorCode);
}

return read;
return -1; // This source is exhausted.
}

/** Returns once the source is either readable or finished. */
Expand Down Expand Up @@ -427,15 +445,6 @@ void receive(BufferedSource in, long byteCount) throws IOException {
}
cancelStreamIfNecessary();
}

private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("stream closed");
}
if (errorCode != null) {
throw new StreamResetException(errorCode);
}
}
}

void cancelStreamIfNecessary() throws IOException {
Expand Down

0 comments on commit e593e2e

Please sign in to comment.