typelevel/cats-effect

`OutOfMemoryError` not propagated when `IO` originates from `CompletableFuture`

Open

#4,505 建立於 2025年10月13日

在 GitHub 查看
 (11 留言) (2 反應) (0 負責人)Scala (2,230 star) (572 fork)batch import
:beetle: buggood first issue

描述

In our application, an OutOfMemoryError raised inside a fiber created from a CompletableFuture does not crash the JVM process. Instead, the error is caught and returned as a failed fiber outcome.

This differs from the behavior when the same error is thrown directly from an IO, where the error bubbles up and terminates the process as expected.

Example:

import cats.effect._
import cats.implicits._

import java.util.concurrent.{CompletableFuture, Executor, Executors}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] =
    Resource.fromAutoCloseable(IO(Executors.newFixedThreadPool(1))).use { executor =>
      for {
        pingFiber <- pingIO.start
        // _ <- boomFromIO.start // -> crashes the app (expected)
        _ <- boomFromCompletableFuture(ExecutionContext.fromExecutor(executor)).start // -> does NOT crash the app
        _ <- pingFiber.join
      } yield ExitCode.Success
    }

  private val pingIO =
    (IO.println("ping") *> IO.sleep(1.seconds)).foreverM

  private def boomFromIO: IO[Unit] = IO {
    println("Waiting 2 seconds before boom...")
    Thread.sleep(2000)
    println("Gonna boom!")
    throw new OutOfMemoryError("Boom!")
  }

  private def boomFromCompletableFuture(executor: Executor): IO[Unit] =
    IO.fromCompletableFuture(IO(CompletableFuture.runAsync(() => {
      println("Waiting 2 seconds before boom...")
      Thread.sleep(2000)
      println("Gonna boom!")
      throw new OutOfMemoryError("Boom!")
    }, executor))).void
}

Analysis: The difference seems to come from IO.fromCompletableFuture, which relies on CompletableFuture.handle. Since handle catches all Throwable, the OutOfMemoryError ends up wrapped in the failed outcome instead of escaping and crashing the process.

Question: Is this the intended behavior? If not, should fromCompletableFuture avoid intercepting fatal errors like OutOfMemoryError to align with how IO behaves?

Notes: I experimented by modifying the implementation to re-surface fatal errors in onError, and in that case the OutOfMemoryError bubbled up as expected:

def fromCompletableFuture[F[_], A](fut: F[CompletableFuture[A]])(implicit F: Async[F]): F[A] = F.cont {
    new Cont[F, A, A] {
      def apply[G[_]](implicit G: MonadCancelThrow[G]): (Either[Throwable, A] => Unit, G[A], F ~> G) => G[A] = {
        (resume, get, lift) =>
          G.uncancelable { poll =>
            G.flatMap(poll(lift(fut))) { cf =>
              val go = F.delay {
                cf.handle[Unit] {
                  case (a, null) => resume(Right(a))
                  case (_, NonFatal(t)) =>
                    resume(Left(t match {
                      case e: CompletionException if e.getCause ne null => e.getCause
                      case _ => t
                    }))
                }
              }

              val await = G.onCancel(
                poll(get.onError(_ => G.unit)), // re-surface the OutOfMemoryError to main IO
                // if cannot cancel, fallback to get
                G.ifM(lift(F.delay(cf.cancel(true))))(G.unit, G.void(get))
              )

              G.productR(lift(go))(await)
            }
          }
      }
    }
  }

貢獻者指南