Akka笔记之Actor监控
在讲到Actor生命周期的时候,我们看到可以用不同的方法来终止一个Actor的运行(通过ActorSystem.stop或者ActorContext.stop甚至是发送一个PoisonPill消息——同样的还有Kill和gracefulStop方法)。
不管Actor是怎么挂掉的,系统里面会有些Actor希望能够知晓这一情况。举个简单的例子,现在有一个和数据库打交道的Actor——我们把它叫作RepositoryActor。显然,系统里面会有其它的一些Actor朝它发送消息。这些对它感兴趣的Actor就能监控这个Actor的状态,看它是否挂掉了。用行话来说就是,DeathWatch。显然ActorContext.watch和ActorContext.unwatch就是监控与取消监控的方法了。进行了监控之后,监控者会收到已停止的Actor发来的一条Terminated消息,它们只需要把这个消息放到receive函数的处理逻辑里就好了。
监控(Watch)与监督(Supervision)不同,监督对Actor的父子关系有着严格的约束,而任何一个Actor都可以对ActorSystem里的其它Actor进行监控。
我们来看一下代码。
QueryRepositoryActor
QueryRepositoryActor里面保存着一个名言的列表,它收到QuoteRepositoryRequest请求后会随机返回其中的一条。 它会记录自己收到了多少消息,如果超过三条的话就"服毒自尽"(发送PoisonPill消息) 代码并无任何特别之处。
package me.rerun.akkanotes.deathwatch
import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._
import scala.util.Random
class QuoteRepositoryActor() extends Actor with ActorLogging {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
var repoRequestCount:Int=1
def receive = {
case QuoteRepositoryRequest => {
if (repoRequestCount>3){
self!PoisonPill
}
else {
//Get a random Quote from the list and construct a response
val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))
log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
repoRequestCount=repoRequestCount+1
sender ! quoteResponse
}
}
}
}
TeacherActorWatcher
同样的,TeacherActorWatcher也就是创建了一个QuoteRepositoryActor然后通过context.watch方法对它进行监控,也没有什么特别的地方。
package me.rerun.akkanotes.deathwatch
import akka.actor.{Terminated, Props, Actor, ActorLogging}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest
class TeacherActorWatcher extends Actor with ActorLogging {
val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
context.watch(quoteRepositoryActor)
def receive = {
case QuoteRequest => {
quoteRepositoryActor ! QuoteRepositoryRequest
}
case Terminated(terminatedActorRef)=>{
log.error(s"Child Actor {$terminatedActorRef} Terminated")
}
}
}
测试用例
这个就有点意思了。坦白说,我压根儿没想过这个居然也能进行测试。akka-testkit太赞了。我们来分析下这里的三个测试用例:
- 断言监控后是否能收到Terminated消息
QuoteRepositoryActor在收到第4条消息后应该给测试用例发送一条Terminated消息。前三条消息是OK的。
"A QuoteRepositoryActor" must {
...
...
...
"send back a termination message to the watcher on 4th message" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //Let's watch the Actor
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectTerminated(quoteRepository) //Expect a Terminated Message
}
}
- 断言在未监控的情况下不会收到Terminated消息
这里面的一些操作其实和前面那个用例是重复的,只是为了验证下context.unwatch方法。如果我们把testProbe.watch和testProbe.unwatch语句删掉了这个用例也能正常运行。
"not send back a termination message on 4th message if not watched" in {
val quoteRepository=TestActorRef[QuoteRepositoryActor]
val testProbe=TestProbe()
testProbe.watch(quoteRepository) //watching
within (1000 millis) {
var receivedQuotes = List[String]()
(1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
receiveWhile() {
case QuoteRepositoryResponse(quoteString) => {
receivedQuotes = receivedQuotes :+ quoteString
}
}
testProbe.unwatch(quoteRepository) //not watching anymore
receivedQuotes.size must be (3)
println(s"receiveCount ${receivedQuotes.size}")
//4th message
quoteRepository!QuoteRepositoryRequest
testProbe.expectNoMsg() //Not Watching. No Terminated Message
}
}
- 断言TeacherActorWatcher确实收到了Terminated消息
我们通过订阅EventStream来看下是否有某条特定的日志消息,以判断Actor是否确是终止了。
"end back a termination message to the watcher on 4th message to the TeacherActor" in {
//This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
val teacherActor=TestActorRef[TeacherActorWatcher]
within (1000 millis) {
(1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor
EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
teacherActor!QuoteRequest //Send the dangerous 4th message
}
}
}
毫无疑问,EventFilter的pattern属性是需要传入一个正则模式的。pattern="""Child Actor .* Terminated"""是说希望能匹配到一条Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated格式的日志信息。
Github
跟往常一样,代码放到了github上。看一下里面的deathwatch包。