Skip to content

Commit

Permalink
#11932 make succeeded and failed in ICB final + introduce onSuccess
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban authored and olamy committed Jun 27, 2024
1 parent 3091012 commit d1a52cf
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ protected Action process() throws Throwable
}

@Override
public void succeeded()
protected void onSuccess()
{
boolean proceed = true;
if (committed)
Expand Down Expand Up @@ -588,8 +588,6 @@ else if (expect100)
// There was some concurrent error, terminate.
complete = true;
}

super.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,9 @@ protected Action process() throws Exception
}

@Override
public void succeeded()
protected void onSuccess()
{
release();
super.succeeded();
}

@Override
public void failed(Throwable x)
{
release();
super.failed(x);
}

@Override
Expand All @@ -259,6 +251,7 @@ protected void onCompleteSuccess()
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
release();
callback.failed(cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,11 @@ protected void onCompleteSuccess()
}

@Override
public void succeeded()
protected void onSuccess()
{
if (active != null)
active.succeeded();
active = null;
super.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ protected Action process() throws Throwable
}

@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("Written {} buffers - entries processed/pending {}/{}: {}/{}",
Expand All @@ -304,7 +304,6 @@ public void succeeded()
processedEntries,
pendingEntries);
finish();
super.succeeded();
}

private void finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,17 +515,15 @@ public void failed(Throwable failure)
}

@Override
public void succeeded()
protected void onSuccess()
{
frameInfo.callback.succeeded();
super.succeeded();
}

@Override
public void failed(Throwable failure)
protected void onCompleteFailure(Throwable cause)
{
frameInfo.callback.failed(failure);
super.failed(failure);
frameInfo.callback.failed(cause);
}

@Override
Expand Down Expand Up @@ -671,17 +669,16 @@ protected Action process()
}

@Override
public void succeeded()
protected void onSuccess()
{
frameInfo.callback.succeeded();
super.succeeded();
}

@Override
public void failed(Throwable failure)
protected void onCompleteFailure(Throwable cause)
{
frameInfo.callback.failed(failure);
super.failed(failure);
frameInfo.callback.failed(cause);
}

private void offer(Stream stream, Frame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected Action process()
}

@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entries, this);
Expand All @@ -119,8 +119,6 @@ public void succeeded()
entries.clear();

invocationType = InvocationType.NON_BLOCKING;

super.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,12 @@ protected Action process()
}

@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} buffers on {}", accumulator.getByteBuffers().size(), this);

accumulator.release();

super.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected Action process()
}

@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entry, this);
Expand All @@ -98,19 +98,17 @@ public void succeeded()

entry.callback.succeeded();
entry = null;

super.succeeded();
}

@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", entry, this, x);
LOG.debug("failed to write {} on {}", entry, this, cause);

accumulator.release();

entry.callback.failed(x);
entry.callback.failed(cause);
entry = null;

// Continue the iteration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,12 @@ protected Action process()
}

@Override
public void succeeded()
protected void onSuccess()
{
_buffer = null;
Callback callback = _callback;
_callback = null;
callback.succeeded();
super.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,9 @@ protected Action process()
}

@Override
public void succeeded()
protected void onSuccess()
{
entry.callback.succeeded();
super.succeeded();
}

@Override
public void failed(Throwable x)
{
entry.callback.failed(x);
super.failed(x);
}

@Override
Expand All @@ -394,10 +386,11 @@ public InvocationType getInvocationType()
@Override
protected void onCompleteFailure(Throwable cause)
{
entry.callback.failed(cause);
QuicConnection.this.close();
}

private class Entry
private static class Entry
{
private final Callback callback;
private final SocketAddress address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,11 @@ protected Action process() throws IOException
}

@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("written cipher bytes on {}", QuicSession.this);
cipherBuffer.release();
super.succeeded();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,17 +760,11 @@ else if (filled == 0)
}

@Override
public void succeeded()
protected void onSuccess()
{
if (LOG.isDebugEnabled())
LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this);
buffer.release();
super.succeeded();
}

@Override
protected void onCompleteSuccess()
{
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public void onConnect(EndPoint endPoint)
channels.put(channel.id, channel);

// Register for read interest with the EndPoint.
endPoint.fillInterested(new EndPointToChannelCallback(channel));
EndPointToChannelCallback endPointToChannelCallback = new EndPointToChannelCallback(channel);
endPoint.fillInterested(Callback.from(endPointToChannelCallback::iterate));
}

// Called when there data to read from the Gateway on the given Channel.
Expand Down Expand Up @@ -322,18 +323,10 @@ protected Action process() throws Throwable
endPoint.fillInterested(this);
return Action.IDLE;
}
channel.write(this, buffer);
channel.write(Callback.from(this::iterate), buffer);
return Action.SCHEDULED;
}

@Override
public void succeeded()
{
// There is data to read from the EndPoint.
// Iterate to read it and send it to the Gateway.
iterate();
}

@Override
protected void onCompleteSuccess()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ protected IteratingCallback(boolean needReset)
*/
protected abstract Action process() throws Throwable;

/**
* Invoked when one task has completed successfully.
*/
protected void onSuccess()
{
}

/**
* Invoked when the overall task has completed successfully.
*
Expand Down Expand Up @@ -239,6 +246,7 @@ private void processing()
boolean notifyCompleteSuccess = false;
Throwable notifyCompleteFailure = null;

boolean callOnSuccess = false;
// While we are processing
processing:
while (true)
Expand All @@ -247,6 +255,11 @@ private void processing()
Action action = null;
try
{
if (callOnSuccess)
{
onSuccess();
callOnSuccess = false;
}
action = process();
}
catch (Throwable x)
Expand Down Expand Up @@ -309,6 +322,7 @@ private void processing()
throw new IllegalStateException(String.format("%s[action=%s]", this, action));
// we lost the race, so we have to keep processing
_state = State.PROCESSING;
callOnSuccess = true;
continue;
}

Expand Down Expand Up @@ -342,7 +356,7 @@ else if (notifyCompleteFailure != null)
* to call {@code super.succeeded()}.
*/
@Override
public void succeeded()
public final void succeeded()
{
boolean process = false;
try (AutoLock ignored = _lock.lock())
Expand Down Expand Up @@ -374,7 +388,10 @@ public void succeeded()
}
}
if (process)
{
onSuccess();
processing();
}
}

/**
Expand All @@ -392,7 +409,7 @@ public void succeeded()
* @see #isFailed()
*/
@Override
public void failed(Throwable x)
public final void failed(Throwable x)
{
boolean failure = false;
try (AutoLock ignored = _lock.lock())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest
}

@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
super.failed(x);
onError(x);
onError(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,9 @@ protected void onRequestContent(HttpServletRequest request, Request proxyRequest
}

@Override
public void failed(Throwable x)
protected void onCompleteFailure(Throwable cause)
{
super.failed(x);
onError(x);
onError(cause);
}
}

Expand Down

0 comments on commit d1a52cf

Please sign in to comment.