Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.FromAsync produces unobserved task exceptions #1256

Closed
lezzi opened this issue Jul 22, 2020 · 12 comments · Fixed by #1914
Closed

Observable.FromAsync produces unobserved task exceptions #1256

lezzi opened this issue Jul 22, 2020 · 12 comments · Fixed by #1914
Milestone

Comments

@lezzi
Copy link

lezzi commented Jul 22, 2020

Context

Recently I discovered an uncertain behavior in Observable.FromAsync method which causes unobserved exceptions to be thrown.

When there is an active subscription and task throws an exception everything works as expected - exception is catched by RX and forwarded to onError handler. The problem happens when subscription is cancelled right before the task failure. Under the hood (inside SlowTaskObservable) RX cancels task continuation, Exception property is not accessed by anyone and when GC is triggered - task unobserved exception is thrown.

My main concerns here are next:

  • This behavior is inconsistent from all other non-task cases. For example if exception is thrown from Select (or anywhere else) and if subscription is cancelled at this point - exception is not forwarded anywhere and is "silently" ignored.
  • RX is responsible for the task creation and by default always accessed Exception property. But when subscription is cancelled behavior changes completely.
  • It basically means there is no 100% reliable way to get rid of TaskScheduler.UnobservedTaskException. Any Observable.FromAsync usage can cause an unobserved exception. This part is critical in my view.

How I expect it to work: if RX creates a task and takes responsibility for error handling, it should do it fully from the beginning to the end. Behavior has to be consistent, no subscription means we are no longer interested in errors and we shouldn't get them anywhere.

@lioobayoyo
Copy link

lioobayoyo commented Jun 25, 2021

Has been a bit of a headache as well.

Solution I found in my specific case was to use FromAsync(...).Materialize(), it kinda streamlines the threading pasta and allows for proper .Catch to work.

Unsure if it always works, though...

@ChrisPulman
Copy link
Contributor

This issue cant be solved by using the Materialize() option for all cases, the Observable just completes and does not handle Async Cancellation correctly, it should always produce a result.

@ChrisPulman
Copy link
Contributor

@clairernovotny it would really help if you can add fixing this to a higher priority list and make a public release with the fix. Many thanks in advance.

@clairernovotny
Copy link
Member

@bartdesmet ?

@ChrisPulman
Copy link
Contributor

ChrisPulman commented Aug 22, 2021

        [Fact]
        public async Task ObservableFromAsyncHandlesCancellation()
        {
            var statusTrail = new List<(int, string)>();
            var position = 0;
            Exception exception = null;
            var fixture = Observable.FromAsync(async (token) =>
            {
                var ex = new Exception();
                statusTrail.Add((position++, "started command"));
                try
                {
                    await Task.Delay(7000, token).ConfigureAwait(true);
                }
                catch (OperationCanceledException oce)
                {
                    statusTrail.Add((position++, "starting cancelling command"));
                    // dummy cleanup
                    await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
                    statusTrail.Add((position++, "finished cancelling command"));
                    throw new Exception("break execution", oce);
                }

                statusTrail.Add((position++, "finished command Normally"));
                throw new Exception("break execution");
            }).Catch<Unit, Exception>(
                        ex =>
                        {
                            //// should be OperationCanceledException
                            //// OR user code exception

                            exception = ex;
                            statusTrail.Add((position++, "Exception Should Be here"));
                            return Observable.Throw<Unit>(ex);
                        }).Finally(() => statusTrail.Add((position++, "Should ONLY come here Last")));

            var cancel = fixture.Subscribe();

            if (true)
            {
                //// This operation is not as expected

                await Task.Delay(500).ConfigureAwait(true);
                cancel.Dispose();

                // Wait 5050 ms to allow execution and cleanup to complete
                await Task.Delay(5050).ConfigureAwait(false);

                Assert.True(statusTrail.Select(x => x.Item2).Contains("finished cancelling command"));
                Assert.Equal("Should ONLY come here Last", statusTrail.Last().Item2);

                //// (0, "started command")
                //// (1, "Should ONLY come here Last")
                //// (2, "starting cancelling command")
                //// (3, "finished cancelling command")
            }
            else
            {
                //// This operates as expected

                // Wait 7010 ms to allow execution and cleanup to complete
                await Task.Delay(7010).ConfigureAwait(false);
                Assert.True(statusTrail.Select(x => x.Item2).Contains("Exception Should Be here"));
                Assert.Equal("Should ONLY come here Last", statusTrail.Last().Item2);

                //// (0, "started command")
                //// (1, "finished command Normally")
                //// (2, "Exception Should Be here")
                //// (3, "Should ONLY come here Last")
            }
        }

@ChrisPulman
Copy link
Contributor

ChrisPulman commented Aug 22, 2021

        [Fact]
        public async Task ObservableFromAsyncHandlesCancellationWithResult()
        {
            var statusTrail = new List<(int, string)>();
            var position = 0;
            Exception exception = null;
            var fixture = Observable.FromAsync(async (token) =>
            {
                statusTrail.Add((position++, "started command"));
                try
                {
                    await Task.Delay(7000, token).ConfigureAwait(true);
                }
                catch (OperationCanceledException oce)
                {
                    statusTrail.Add((position++, "starting cancelling command"));
                    // dummy cleanup
                    await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
                    statusTrail.Add((position++, "finished cancelling command"));
                    return new Exception("break execution", oce);
                }

                statusTrail.Add((position++, "finished command Normally"));
                return new Exception("break execution");
            }).Catch<Exception, Exception>(
                        ex =>
                        {
                            //// should be OperationCanceledException
                            //// OR user code exception

                            exception = ex;
                            statusTrail.Add((position++, "Exception Should Be here"));
                            return Observable.Throw<Exception>(ex);
                        }).Finally(() => statusTrail.Add((position++, "Should ONLY come here Last")));

            var cancel = fixture.Subscribe(x => exception = x);

            if (true)
            {
                //// This operation is not as expected

                await Task.Delay(500).ConfigureAwait(true);
                cancel.Dispose();

                // Wait 5050 ms to allow execution and cleanup to complete
                await Task.Delay(5050).ConfigureAwait(false);

                Assert.True(statusTrail.Select(x => x.Item2).Contains("finished cancelling command"));
                Assert.True(statusTrail.Select(x => x.Item2).Contains("Exception Should Be here"));
                Assert.Equal("Should ONLY come here Last", statusTrail.Last().Item2);

                //// (0, "started command")
                //// (1, "Should ONLY come here Last")
                //// (2, "starting cancelling command")
                //// (3, "finished cancelling command")
            }
            else
            {
                //// This operates as expected

                // Wait 7010 ms to allow execution and cleanup to complete
                await Task.Delay(7010).ConfigureAwait(false);

                Assert.Equal("Should ONLY come here Last", statusTrail.Last().Item2);
                Assert.Equal("break execution", exception.Message);
                //// (0, "started command")
                //// (1, "finished command Normally")
                //// (2, "Exception Should Be here")
                //// (3, "Should ONLY come here Last")
            }
        }

Hopefully these will help you find the issue

@danielcweber
Copy link
Collaborator

danielcweber commented Aug 26, 2021

Here's a different take on the issue:

The exception must be left unobserved because that's what it is. The subscriber cancelled its subscription before it could observe it. The user's code didn't setup the try-catch handlers to handle the exception. Rx code could observe it but doesn't know how to handle it - observing it just for the sake of observing it (in a catch-em-all approach) is bad style IMO. Thus, it's rightfully considered unobserved and should be treated as such. I disagree that it is Rx duty to handle the exception. Yes, as @lezzi pointed out, it may be responsible for task creation but it's the user's code that throws.

About being completely UnobservedTaskException-event-free: I see that OperationCanceledExceptions are annoying. You might try and implement a ImprovedFromAsync extension that wraps the user's delegate and catches the OCE. For any other unhandled/unexpected exception, I claim that you do want it to bubble up somewhere. It gives you an opportunity to log it and handle more gracefully in your own code - which is observing it. Changing Rx to just ignore the exception will, in the worst case, just hide bugs.

@bartdesmet
Copy link
Collaborator

Note that the following sequence is totally expected:

                //// (0, "started command")
                //// (1, "Should ONLY come here Last")
                //// (2, "starting cancelling command")
                //// (3, "finished cancelling command")

When cancel.Dispose() is called on thread T1, the following steps take place:

`Dispose()` on the `Finally` operator
|
+-- The `Sink` wrapping the user's obsever gets `Dispose`d, causing no further `On*` messages to get through.
|
+-- `Dispose()` is called on its source, i.e. the `Catch` operator
|   |
|   +-- Mute the sink (as above)
|   |
|   +-- `Dispose()` is called on its source, i.e. the `FromAsync` operator
|       |
|       +-- Trigger cancellation on the `CancellationToken`
|           |
|           |  // NB: There's a race condition now with another thread T2 (the delay timer) observing the cancellation
|           |  //     which could trigger (2;3, "starting;finishing cancelling command")
|           |
|           +-- Causes the `ContinueWith` continuation that'd observe exceptions to be unhooked
|
+-- Invoke the callback passed to `Finally` // (1, "Should ONLY come here Last")

That is, the Should ONLY come here Last cannot be guaranteed because cancellation through signaling of a CancellationToken is asynchronous; there's no implied "join" with the Task<T> that's being canceled. Such a "join" would be tricky given that Dispose is synchronous and the only resort would be to do a blocking wait for the running task (in the AsyncRx.NET world, the DisposeAsync could naturally perform such a join by awaiting the task). In the example, that'd prevent further unwinding of the Dispose operation up to the Finally operator. However, the use of such an operator would be limited to only ensuring that Dispose[Async] cannot return until the spawned task has completed; once disposal has been initiated, the outgoing observer is being silenced, so we can't feed any possible Exception down the OnError channel; it'd be dropped anyway. (It was a conscious decision in the initial Rx 1.0 design to silence observers prior to propagating Dispose calls up the chain, rather than the reverse order where during an ongoing Dispose operation the observer may still get called, which could cause tricky "chasing the tail" situations in certain operators, but I digress.)

There are two things left to discuss.

First, OperationCanceledException as mentioned above on the thread. In which case does this exception bubble up through Task.Exception and ends up being not observed? In the context of async methods, this should always translate to a Canceled state rather than a Faulted state:

> async Task G() => throw new OperationCanceledException();
> var t = G();
> t
Task<VoidTaskResult>(Id = 541, Status = Canceled, Method = "{null}", Result = "{Not yet computed}") { AsyncState=null, CancellationPending=false, CreationOptions=None, Exception=null, Id=541, Result=[System.Threading.Tasks.VoidTaskResult], Status=Canceled }
> t.Exception
null

This is part of the async method's behavior to have some awareness of cancellation and tie it up to the Canceled state, see https://referencesource.microsoft.com/#mscorlib/system/runtime/compilerservices/AsyncMethodBuilder.cs,661. In the case we should never have unobserved exceptions.

There are other cases where this translation doesn't happen though, e.g.

> var tcs = new TaskCompletionSource<int>();
> tcs.SetException(new OperationCanceledException());
> tcs.Task
Task<int>(Id = 652, Status = Faulted, Method = "{null}", Result = "{Not yet computed}") { AsyncState=null, CancellationPending=false, CreationOptions=None, Exception=Count = 1, Id=652, Result=0, Status=Faulted }

or

> var t = Task.Run(() => 42).ContinueWith(t => throw new OperationCanceledException());
> t
ContinuationTaskFromResultTask<int>(Id = 605, Status = Faulted, Method = "{null}") { AsyncState=null, CancellationPending=false, CreationOptions=None, Exception=Count = 1, Id=605, Status=Faulted }

But it's unclear to me whether these should be treated differently from any other exception for a Faulted task. (For starters, the .NET libraries do not consider this to be Canceled rather than Faulted.) Which brings us to the gist of the issue here, namely what to do with arbitrary exceptions.

I'm with @danielcweber here from a purity point of view around dropping exceptions, though I can see there's an arguable inconsistency with e.g. an exception thrown from a selector or predicate or whatnot that ends up getting swallowed during disposal of the subscription because the observer has already been "muted". At most, I could see an optional flag (which defaults to the current behavior, as some people may rely on handling these exceptions "globally", so one would have to opt in to such an ignoreExceptionsAfterDispose flag) on these conversions to influence the behavior. The "fix" would likely simply be changing the ContinueWith on the Task<T> returned by the user to optionally wire the cts.Token or CancellationToken.None (thus keeping the continuation attached even in case of disposal and thus causing it to always retrieve the exception upon task completion, only to send it into a "muted" observer).

@ChrisPulman
Copy link
Contributor

@bartdesmet Thanks for your detailed explanation of how it is operating at present, and understandably some may rely on the Finally being hit without any exceptions on the Catch function when a CancellationToken is cancelled.
Observable.FromAsync functionality :
- execute Task => task cancellation occurs => finally => Task continues => Task completes at some unknown point.
OR
- execute Task => exception thrown in user code => Task completes => error => finally.
OR
- execute Task => Task completes => produce result/s (should always return a Unit upon task completion if no value produced) => complete => finally.


However it does not fit to every scenario.
If no cancellation token is specified then the functionality operates as we expect and handles exceptions correctly, the functionality we require is :
- execute Task => exception thrown in user code or task cancellation occurs => Task completes => error => finally.
OR
- execute Task => Task completes => produce result/s (should always return a Unit upon task completion if no value produced) => complete => finally.

In almost every case we want to know when an exception was produced during execution, as the Cancellation causes the execution to stop without completing it should be reported as an error.
Finally should only fire when all execution of the started task/s has been completed otherwise its a maybe finally.
We would like to see a fix or alternate function to this that follows these normal steps.
Observable.FromCancellationTask perhaps....

I trust that you can see our point of view and will be able to provide a suitable function to cater for our needs.

@xleon
Copy link

xleon commented Feb 24, 2022

I'm getting a massive amount of crash reports in Raygun (production) due to this, which was not happening in the past but I don't know what version introduced the issue.

@tomasfil
Copy link

Hello, is it possible to resurrect this? @clairernovotny @bartdesmet @ChrisPulman , since this is hurting ReactiveCommand a lot. I have dealt with like 6 people struggling with this issue within short timespan

@idg10
Copy link
Collaborator

idg10 commented Apr 3, 2023

It looks to me like there are two different proposals on the table:

  1. ignore any post-unsubscribe exceptions from the source Task
  2. report post-unsubscribe exceptions through OnError

I believe 1 is what @lezzi asked for in arguing for consistency with other scenarios, such as when the selector callback for a Select was in progress at the instant of unsubscription, and then throws an exception after Rx has got far enough through processing the unsubscription to 'mute' the sink that wraps the application's observer. If the selector callback throws once we're at that stage then, as @lezzi says, the

exception is not forwarded anywhere and is "silently" ignored

I don't think the argument that such exceptions should be ignored is entirely watertight, even for cases like Select, but it is true that this is what actually happens today with Select. There are arguments one could make in favour of Select reporting this as an unhandled exception. But since it doesn't in fact do that, then the argument of consistency is that FromAsync should also silently swallow the exception in this case.

I think this is effectively what @bartdesmet proposed in his suggestion for an optional ignoreExceptionsAfterDispose flag.

But I think @ChrisPulman is asking for something slightly different:

execute Task => exception thrown in user code or task cancellation occurs => Task completes => error => finally

This actually rolls two behaviours into one statement (presumably to underscore the apparent consistency of this proposal). But we could (and in my view should) separate it back out into:

  1. execute Task => exception thrown in user code => Task completes (faulted) => error => finally
  2. execute Task => task cancellation occurs => Task completes (faulted) => error => finally

The scenario described in 1 is one where we don't unsubscribe, and in that case, the behaviour is already as described. The absence of any unsubscription in that case makes it very different from 2, which is why I think it's better to consider these separately. So it's really just 2—what happens when unsubscription occurs at a critical moment—that's under discussion here. So I think what we have is:

Option Behaviour
Current implementation execute Task => task cancellation occurs => finally => Task continues => Task fails => unobserved exception reported
@lezzi's request / @bartdesmet's ignoreExceptionsAfterDispose suggestion: execute Task => task cancellation occurs => finally => Task continues => Task fails => error ignored
@ChrisPulman's request execute Task => task cancellation occurs => Task continues => Task fails => error => finally

Also, I think that in this thread there has been a tendency to conflate unsubscription (calling Dispose on the IDisposable returned by Subscribe) with cancellation, e.g. from the original post:

The problem happens when subscription is cancelled right before the task failure

This conflation disguises important aspects of the problem because there are two separate things—the call to Dispose and the cancellation of work in the Task returned by the callback passed to FromAsync. (They are related, in that the call to Dispose is what triggers the beginning of the cancellation of the work. But they are separate in that they finish at quite different times.) We could make this explicit by expanding things out in the table:

Option Behaviour
Current implementation execute Task => sub.Dispose called => task cancellation occurs => finally => Dispose returns => Task continues => Task fails => unobserved exception reported
@lezzi's request / @bartdesmet's ignoreExceptionsAfterDispose suggestion: execute Task => sub.Dispose called => task cancellation occurs => finally => Dispose returns => Task continues => Task fails => error ignored
@ChrisPulman's request execute Task => sub.Dispose called => task cancellation occurs => Task continues => Task fails => => error => finally => Dispose returns

(The "error" and "finally" here are copied from @ChrisPulman's message. I am interpreting these as referring to the calls to the callbacks passed to Catch and Finally respectively.)

By showing the call to and, critically, the return from Dispose explicitly here, the tricky aspect of @ChrisPulman's request becomes easier to see: for it to work the way he proposes, the call to Dispose would have to block until cancellation has run its course. (Just to be clear, there are 3 ways that cancellation can run its course. 1: the running task might never actually notice it was cancelled, and just run to completion; 2: cancelation might be honoured and the Task completes in a Canceled state, or 3: an error occurs and the Task completes in a Faulted state.) And because cancellation is a) asynchronous and b) best-effort, that could mean blocking for an arbitrary length of time.

An alternative would be to allow Dispose to return earlier:

execute Task => sub.Dispose called => task cancellation occurs => Dispose returns => Task continues => Task fails => error => finally

but this creates a new problem: in this model, notifications continue to be passed to the observer even after the call to Dispose the subscription has returned! This seems like breaking the protocol of IObservable<T>/IObserver<T>: as I understand the rules, a source isn't supposed to carry on delivering items to an observer after that observer has unsubscribed. So I don't consider this to be a viable option. That leaves us just with the version described in the last table above, in which Dispose might block indefinitely.

So here's where I'm at with my understanding of the two quite different solutions being asked for here:

  1. @lezzi's request and @bartdesmet's suggestion to make it possible for the exception to be ignored seems relatively straightforward, and could be added as a new feature in a backwards compatible way
  2. as far as I can see @ChrisPulman's suggestion requires us either to make Dispose block until the underlying task is either done, faulted, or cancelled

It's not clear to me which, if either, of these two proposals would solve the problems that ReactiveCommand is having.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants