akka/akka-core
View on GitHubInteraction between ExplicitlyTriggeredScheduler & streams timers
Open
#31,389 opened on May 7, 2022
help wantedt:streamt:testing
Description
Hi. I've been playing with ExplicitlyTriggeredScheduler and noticed it doesn't behave as you might expect if you integrate its usage with some of the streams operators. For example, in the code below you'd naively expect the probe to receive a timeout as a result of calling scheduler.timePasses, but no such timeout is received.
import scala.concurrent.duration.DurationInt
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.ExplicitlyTriggeredScheduler
import com.typesafe.config.ConfigFactory
object Reproduction extends App {
implicit val system: ActorSystem = ActorSystem(
"test",
config = Some(ConfigFactory.parseString(
"""akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler""""
))
)
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val probe = Source.never.idleTimeout(10.seconds).runWith(TestSink.probe)
scheduler.timePasses(20.seconds)
// Exception in thread "main" java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsgClass waiting for class akka.stream.testkit.TestSubscriber$OnError
probe.expectSubscriptionAndError()
}
The reason for this is that a number of stream operators, including Idle, work by inspecting the system time when the background timer expires.
There's no documentation suggesting streams should play nicely with ExplicitlyTriggeredScheduler, but I thought I'd raise this for awareness anyway.