akka/akka-core

PersistentActor doesn't stop when RecoveryCompleted message is failed to be processed

Open

#30.439 aberto em 27 de jul. de 2021

Ver no GitHub
 (5 comments) (1 reaction) (0 assignees)Scala (3.547 forks)batch import
1 - triagedhelp wantedt:persistence

Métricas do repositório

Stars
 (13.277 stars)
Métricas de merge de PR
 (Mesclagem média 17h 35m) (11 fundiu PRs em 30d)

Description

According to the documentation to 'receiveRecover' method "If there is a problem with recovering the state of the actor from the journal, the error will be logged and the actor will be stopped.". However, it doesn't happen if error is raised on RecoveryCompleted message. In this case the actor fails. Here is a code which demonstrates this problem:

import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.persistence.{PersistentActor, RecoveryCompleted}

class TestActor(fail: Boolean) extends PersistentActor {
  override def receiveRecover: Receive = {
    case RecoveryCompleted if fail => throw new RuntimeException("RecoveryCompleted BOOM!")
  }

  override def receiveCommand: Receive = {
    case msg: Message => persist(msg)(_ => ())
  }

  override def persistenceId: String = "123"
}

case class Message(data: String)

class TestActorSupervisor extends Actor {

  context.actorOf(Props(new TestActor(true)))

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case e: Throwable =>
      println("NOT CAUGHT!!!") // demonstrates that the actor was not stopped properly
      SupervisorStrategy.Stop
  }

  override def receive: Receive = PartialFunction.empty
}

object Main extends App {

  val system = ActorSystem()
  val ref = system.actorOf(Props(new TestActor(false)))
  ref ! Message("test")
  Thread.sleep(500)
  system.actorOf(Props[TestActorSupervisor])

}

Guia do colaborador