akka/akka-core

supervison strategy not applied to flatMapMerge

Open

#23,066 创建于 2017年5月30日

在 GitHub 查看
 (14 评论) (1 反应) (0 负责人)Scala (13,277 star) (3,547 fork)batch import
1 - triagedhelp wantedt:stream

描述

I would expect that the flatMapMerge below behave the same as the mapAsyncUnordered:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object MapMergeConcatError extends App {
  implicit val system = ActorSystem("Main")
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher

  val subFlow = {
    Flow[Int]
      .mapAsyncUnordered(5)(i => Future {
        if (i == 4) sys.error("☠")
        i * 5
      })
      .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
      .reduce(_ + _)
  }

  val subStreamFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
    .flatMapMerge(5, m => Source.single(m).mapConcat(identity).via(subFlow))
    .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
    .runWith(Sink.seq)

  val mapAsyncFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
    .mapAsyncUnordered(5)(m => Source.single(m).mapConcat(identity).via(subFlow).runWith(Sink.head))
    .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
    .runWith(Sink.seq)

  val f1 = Await.ready(subStreamFuture, 10.seconds)
  val f2 = Await.ready(mapAsyncFuture, 10.seconds)

  println(s"Using flatMapMerge: $f1")
  println(s"Using mapAsyncUnordered: $f2")

  system.terminate()
}

The output is:

Using flatMapMerge: Future(Failure(java.lang.RuntimeException: ☠)) Using mapAsyncUnordered: Future(Success(Vector(30, 15)))

贡献者指南

supervison strategy not applied to flatMapMerge · akka/akka-core#23066 | Good First Issue