akka/akka-core

Interaction between ExplicitlyTriggeredScheduler & streams timers

Open

#31.389 aperta il 7 mag 2022

Vedi su GitHub
 (1 commento) (1 reazione) (0 assegnatari)Scala (3547 fork)batch import
help wantedt:streamt:testing

Metriche repository

Star
 (13.277 star)
Metriche merge PR
 (Merge medio 17h 35m) (11 PR mergiate in 30 g)

Descrizione

Hi. I've been playing with ExplicitlyTriggeredScheduler and noticed it doesn't behave as you might expect if you integrate its usage with some of the streams operators. For example, in the code below you'd naively expect the probe to receive a timeout as a result of calling scheduler.timePasses, but no such timeout is received.

import scala.concurrent.duration.DurationInt

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.ExplicitlyTriggeredScheduler
import com.typesafe.config.ConfigFactory

object Reproduction extends App {
  implicit val system: ActorSystem = ActorSystem(
    "test",
    config = Some(ConfigFactory.parseString(
      """akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler""""
    ))
  )

  val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
  val probe = Source.never.idleTimeout(10.seconds).runWith(TestSink.probe)
  scheduler.timePasses(20.seconds)
  
  // Exception in thread "main" java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsgClass waiting for class akka.stream.testkit.TestSubscriber$OnError
  probe.expectSubscriptionAndError()
}

The reason for this is that a number of stream operators, including Idle, work by inspecting the system time when the background timer expires.

There's no documentation suggesting streams should play nicely with ExplicitlyTriggeredScheduler, but I thought I'd raise this for awareness anyway.

Guida contributor