Akka笔记之Actor监控

Published: 05 Nov 2014 Category: akka

在讲到Actor生命周期的时候,我们看到可以用不同的方法来终止一个Actor的运行(通过ActorSystem.stop或者ActorContext.stop甚至是发送一个PoisonPill消息——同样的还有Kill和gracefulStop方法)。

不管Actor是怎么挂掉的,系统里面会有些Actor希望能够知晓这一情况。举个简单的例子,现在有一个和数据库打交道的Actor——我们把它叫作RepositoryActor。显然,系统里面会有其它的一些Actor朝它发送消息。这些对它感兴趣的Actor就能监控这个Actor的状态,看它是否挂掉了。用行话来说就是,DeathWatch。显然ActorContext.watch和ActorContext.unwatch就是监控与取消监控的方法了。进行了监控之后,监控者会收到已停止的Actor发来的一条Terminated消息,它们只需要把这个消息放到receive函数的处理逻辑里就好了。

监控(Watch)与监督(Supervision)不同,监督对Actor的父子关系有着严格的约束,而任何一个Actor都可以对ActorSystem里的其它Actor进行监控。

image

我们来看一下代码。

QueryRepositoryActor

QueryRepositoryActor里面保存着一个名言的列表,它收到QuoteRepositoryRequest请求后会随机返回其中的一条。 它会记录自己收到了多少消息,如果超过三条的话就"服毒自尽"(发送PoisonPill消息) 代码并无任何特别之处。

package me.rerun.akkanotes.deathwatch

import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala} 
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._ 
import scala.util.Random

class QuoteRepositoryActor() extends Actor with ActorLogging {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  var repoRequestCount:Int=1

  def receive = {

    case QuoteRepositoryRequest => {

      if (repoRequestCount>3){
        self!PoisonPill
      }
      else {
        //Get a random Quote from the list and construct a response
        val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))

        log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
        repoRequestCount=repoRequestCount+1
        sender ! quoteResponse
      }

    }

  }

}

TeacherActorWatcher

同样的,TeacherActorWatcher也就是创建了一个QuoteRepositoryActor然后通过context.watch方法对它进行监控,也没有什么特别的地方。

package me.rerun.akkanotes.deathwatch

import akka.actor.{Terminated, Props, Actor, ActorLogging} 
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest 
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest

class TeacherActorWatcher extends Actor with ActorLogging {

  val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
  context.watch(quoteRepositoryActor)


  def receive = {
    case QuoteRequest => {
      quoteRepositoryActor ! QuoteRepositoryRequest
    }
    case Terminated(terminatedActorRef)=>{
      log.error(s"Child Actor {$terminatedActorRef} Terminated")
    }
  }
}

测试用例

这个就有点意思了。坦白说,我压根儿没想过这个居然也能进行测试。akka-testkit太赞了。我们来分析下这里的三个测试用例:

  1. 断言监控后是否能收到Terminated消息

QuoteRepositoryActor在收到第4条消息后应该给测试用例发送一条Terminated消息。前三条消息是OK的。

"A QuoteRepositoryActor" must {
    ...
    ...
    ...

    "send back a termination message to the watcher on 4th message" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //Let's watch the Actor

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectTerminated(quoteRepository)  //Expect a Terminated Message
      }
    }
  1. 断言在未监控的情况下不会收到Terminated消息

这里面的一些操作其实和前面那个用例是重复的,只是为了验证下context.unwatch方法。如果我们把testProbe.watch和testProbe.unwatch语句删掉了这个用例也能正常运行。

"not send back a termination message on 4th message if not watched" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //watching

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        testProbe.unwatch(quoteRepository) //not watching anymore
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectNoMsg() //Not Watching. No Terminated Message
      }
    }
  1. 断言TeacherActorWatcher确实收到了Terminated消息

我们通过订阅EventStream来看下是否有某条特定的日志消息,以判断Actor是否确是终止了。

"end back a termination message to the watcher on 4th message to the TeacherActor" in {

      //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
      val teacherActor=TestActorRef[TeacherActorWatcher]

      within (1000 millis) {
        (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor

        EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
          teacherActor!QuoteRequest //Send the dangerous 4th message
        }
      }
    }

毫无疑问,EventFilter的pattern属性是需要传入一个正则模式的。pattern="""Child Actor .* Terminated"""是说希望能匹配到一条Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated格式的日志信息。

Github

跟往常一样,代码放到了github上。看一下里面的deathwatch包。

英文原文链接