Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt behavior for non-recoverable connections/channels #143

Open
acogoluegnes opened this issue Sep 3, 2020 · 9 comments
Open

Adapt behavior for non-recoverable connections/channels #143

acogoluegnes opened this issue Sep 3, 2020 · 9 comments

Comments

@acogoluegnes
Copy link
Contributor

Following up on #142.

The code assumes that connections and channels are recoverable in several places, this does not play well if the connections (and the channels) are not.

protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
channel.addShutdownListener(reason -> {
if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
emitter.complete();
}
});
}

The complete signal should always be generated, because a non-recoverable channel would not recover by itself.

The connectionMono should not be cached because again a non-recoverable connection will not recover, a new instance must be provided by the connectionMono. This would allow to use the connectionMonoConfigurator to provide some retry logic instead of provided a full connectionMono.

There may be other places where checking the connection/channel are recoverable or not may be appropriate.

@acogoluegnes acogoluegnes changed the title Consider not caching non-recoverable connections/channels Adapt behavior for non-recoverable connections/channels Sep 3, 2020
acogoluegnes added a commit that referenced this issue Sep 8, 2020
acogoluegnes added a commit that referenced this issue Sep 8, 2020
References #143

(cherry picked from commit 602ea05)
@a701440
Copy link

a701440 commented Sep 11, 2020

There is one more item that needs checking. Every "send" operation in the Sender is concluded by closing the channel by default (default implementation of the channelCloseHandler).

.doFinally(st -> channelCloseHandler.accept(st, channel))

It's not clear how that is supposed to interact with Mono, etc.
Also why close after every send? Most of the time people send many messages.

@a701440
Copy link

a701440 commented Sep 11, 2020

One more item I noticed is that default SendOptions uses exceptionHandler with ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE. This also creates some "effects" when using retry with connections.

@acogoluegnes
Copy link
Contributor Author

There is one more item that needs checking. Every "send" operation in the Sender is concluded by closing the channel by default (default implementation of the channelCloseHandler).

.doFinally(st -> channelCloseHandler.accept(st, channel))

It's not clear how that is supposed to interact with Mono, etc.
Also why close after every send? Most of the time people send many messages.

The channel is closed when the flux terminates, so indeed on each send calls, but not on every outbound message. The library assumes flux are long or even infinite.

@acogoluegnes
Copy link
Contributor Author

One more item I noticed is that default SendOptions uses exceptionHandler with ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE. This also creates some "effects" when using retry with connections.

What do you suggest to do? I don't think we can disable retry by default if the channel is not recoverable, it could break existing code (it's unlikely, but still). Do you still retry when using non-recoverable connections? If so, I guess you provide your own exception handler (that's what options are for). If not, again, you provide your own exception handler, using the options in the way they're meant for.

I don't see any default behavior that would fit recoverable and non-recoverable resources at the same time in this case.

If you have suggestions, I'm open to the introduction of ready-to-use utilities that would fit most non-recoverable resources use cases though.

@a701440
Copy link

a701440 commented Sep 17, 2020

I am not quite sure what the best API would be. For example when using sender.send with a Flux of messages it's hard to get the success/failure confirmation for each specific message. Send returns then(), the only way to get the rabbit error is to install the exception handler, it's hard to get the positive confirmation that the message was sent, so in most cases I see. code using sender.send(Mono.just(message)) that allows to subscribe to ok and error condition. May be a better way would be to return a publisher of results from send that would have OK or Exception for each incoming message.

@acogoluegnes
Copy link
Contributor Author

you mean something like sendWithPublishConfirm but the returned flux would just tell whether the basicPublish method failed or not? This is indeed more reactive-like I guess, but it won't provide as much guarantee as the publish confirm flavor. I think this is worth considering. In theory, we could change the signature to return something, it's not a breaking change.

@a701440
Copy link

a701440 commented Sep 24, 2020

Probably yes. In addition to that Mono Void ( then() ) that is currently returned is "hard" to work with in Reactor.
It does not trigger doOnSuccess or subscribe(ok -> {}) handlers since there is no "Void" instance.

@acogoluegnes
Copy link
Contributor Author

acogoluegnes commented Sep 25, 2020

My mistake, it would be a breaking change, so it'll have to go in 2.0.

@acogoluegnes
Copy link
Contributor Author

I created a follow-up issue #146 , don't hesitate to post some ideas about what the return flux should contain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants