From e8aa54dcd68a43e1be9727e3f90dacf97288e5b7 Mon Sep 17 00:00:00 2001 From: kyri-petrou <67301607+kyri-petrou@users.noreply.github.com> Date: Sat, 27 Jul 2024 09:14:04 +0300 Subject: [PATCH] Fix propagation of interruption from CIO to ZIO (#697) * Fix propagation of interruption from CIO to ZIO * Wrap method in F.delay * Allow interruption of interruption * Fix compiling * Make sure to complete the interruption CB * Simplify logic by making interruption un-cancellable --- .../scala/zio/interop/CatsInteropSpec.scala | 24 ++++++++++++++++++- .../src/main/scala/zio/interop/package.scala | 24 ++++++++++++------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala index f83c3c00..26c7e7d6 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala @@ -6,6 +6,9 @@ import zio.interop.catz.* import zio.test.* import zio.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean + object CatsInteropSpec extends CatsRunnableSpec { def spec: Spec[Any, Throwable] = suite("Cats interop")( test("cats fiber wrapped in Resource can be canceled") { @@ -203,6 +206,25 @@ object CatsInteropSpec extends CatsRunnableSpec { !exception.get.getMessage.contains("Boxed Exception") && exception.get.getMessage.contains("The fiber was canceled") ) - } + }, + test("CIO propagates interruption to ZIO") { + ZIO.succeedBlocking { + val latch = new CompletableFuture[Unit]() + val ref = new AtomicBoolean(false) + val zioF: CIO[Nothing] = + (ZIO.yieldNow.as(latch.complete(())) *> ZIO.never) + .onInterrupt(ZIO.succeed(ref.set(true))) + .toEffect[CIO] + + val value = zioF.start + .productL(CIO.fromCompletableFuture(CIO(latch))) + .flatMap { (fib: cats.effect.FiberIO[Nothing]) => + fib.cancel *> CIO(ref.get()) + } + .unsafeRunSync() + + assertTrue(value) + } + } @@ TestAspect.nonFlaky(1000) ) } diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 046bfa0d..9736e6f7 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -34,21 +34,27 @@ package object interop { @inline def toEffect[F[_], R, A](rio: RIO[R, A])(implicit R: Runtime[R], F: Async[F], trace: Trace): F[A] = F.defer { val interrupted = new AtomicBoolean(true) - F.async[Exit[Throwable, A]] { cb => - Unsafe.unsafe { implicit unsafe => - val fiber = R.unsafe.fork { + F.asyncCheckAttempt[Exit[Throwable, A]] { cb => + F.delay { + implicit val unsafe: Unsafe = Unsafe.unsafe + + val out = R.unsafe.runOrFork { signalOnNoExternalInterrupt { rio }(ZIO.succeed(interrupted.set(false))) } - fiber.unsafe - .addObserver(exit => cb(Right(exit))) - val cancelerEffect = F.delay { - val _ = fiber.interrupt + out match { + case Left(fiber) => + val completeCb = (exit: Exit[Throwable, A]) => cb(Right(exit)) + fiber.unsafe.addObserver(completeCb) + Left(Some(F.async_ { cb => + fiber.unsafe.addObserver(_ => cb(Right(()))) + fiber.unsafe.removeObserver(completeCb) + fiber.tellInterrupt(Cause.interrupt(fiber.id)) + })) + case Right(v) => Right(v) // No need to invoke the callback, sync resumption will take place } - F.pure(Some(cancelerEffect)) } - }.flatMap { exit => toOutcomeThrowableOtherFiber(interrupted.get())(F.pure(_: A), exit) match { case Outcome.Succeeded(fa) =>