Akka笔记之消息传递

Published: 10 Oct 2014 Category: akka

在Akka笔记第一篇的介绍中,我们大致介绍了下Akka工具箱中的Actor。在第二篇当中,我们来看一下Actor消息传递的功能。这里还是延用之前使用的那个学生-老师的例子。

在Actor消息的第一部分中,我们会创建一个老师的Actor,但学生Actor则先不创建,而是使用一个叫做StudentSimulatorApp的主程序。

仔细回顾下学生-老师模型

我们现在只考虑StudentSimulatorApp发送给TeacherActor的消息。这里我所说的StudentSimulatorApp指的是一个正常的主程序。

图片。

从图中可以看到: (如果有陌生的术语,没关系,后面我们会详细解释的)

  1. 学生创建了一个叫ActorSystem的东西。
  2. 他通过ActorSystem来创建了一个叫ActorRef的对象。QuoteRequest消息就是发送给ActorRef的(它是TeacherActor的一个代理)
  3. ActorRef将消息发送给Dispatcher
  4. Dispatcher将消息投递到目标Actor的邮箱中。
  5. 随后Dispatcher将Mailbox扔给一个线程去执行(这点下节会重点讲到)
  6. MailBox将消息出队并最终将其委托给真实的Teacher Actor的接收方法去处理。

正如我所说的,看不懂也别担心。现在我们来一步步地详细地分析下。全部讲完后你可以再回过头来看下这五个步骤。

STUDENTSIMULATORAPP程序

我们用这个STUDENTSIMULATORAPP来启动JVM并初始化ActorSystem。

图片

从图中可以看到,StudentSimulatorApp

  1. 创建了一个ActorSystem
  2. 通过ActorSystem创建了一个Teacher Actor的代理(ActorRef)
  3. 将QuoteRequest消息发送给代理

我们现在只关注这三点。

1. 创建ActorSystem

ActorSystem是进入到Actor的世界的一扇大门。通过它你可以创建或中止Actor。甚至还可以把整个Actor环境给关闭掉。

另一方面来说,Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object或者scala.Any的角色——也就是说,它是所有Actor的根对象。当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。

图片

初始化ActorSystem的代码是这样的:

val system=ActorSystem("UniversityMessageSystem")

UniversityMessageSystem只是你给ActorSystem起的一个可爱的名字而已。

2. 创建一个TeacherActor的代理?

我们来看下下面这段代码:

val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

actorOf是ActorSystem中创建Actor的方法。但是正如你所看到的,它并不会返回我们所需要的TeacherActor。它返回的是一个ActorRef。

这个ActorRef扮演了真实的Actor的一个代理的角色。客户端并不会直接和Actor通信。这也正是Actor模型中避免直接访问TeacherActor中任何的自定义/私有方法或者变量的一种方式。

再重复一遍,消息只会发送给ActorRef,最终才会到达真正的Actor。你是绝对无法直接和Actor进行通信的。如果你真的找到了什么拙劣的方式来直接通信,大家会恨你入骨的。

image

将消息发送给代理

还是只有一行代码。你只需告诉说把QuoteRequest消息发送到ActorRef就好了。Actor中的这个告诉的方式就是一个!号。(ActorRef中确实也有一个tell方法,不过它只是把这个调用委托给了!号)

//send a message to the Teacher Actor
teacherActorRef!QuoteRequest

这就可以了!

如果你认为我在骗你的话,看一下下面StudentSimulatorApp的完整代码:

STUDENTSIMULATORAPP.SCALA

package me.rerun.akkanotes.messaging.actormsg1

import akka.actor.ActorSystem  
import akka.actor.Props  
import akka.actor.actorRef2Scala  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._


object StudentSimulatorApp extends App{

  //Initialize the ActorSystem
  val actorSystem=ActorSystem("UniversityMessageSystem")

  //construct the Teacher Actor Ref
  val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])

  //send a message to the Teacher Actor
  teacherActorRef!QuoteRequest

  //Let's wait for a couple of seconds before we shut down the system
  Thread.sleep (2000) 

  //Shut down the ActorSystem.
  actorSystem.shutdown()

} 

好吧,我承认我撒了点小谎。你还得关掉ActorSystem,不然JVM会一直运行下去的。我还让主线程睡眠了一小会儿,以便给点时间让TeacherActor去完成它的任务。我知道这听起来很愚蠢。别担心。后面我们会通过些优雅的测试用例来替换掉这种取巧的方式。

消息

我们刚发送了一个QuoteMessage给ActorRef,但是,还压根儿没看着过这个消息类呢!

说曹操,曹操到:

(实践中推荐你把消息封装成一个好点的对象,这样维护起来容易些)

TeacherProtocol

package me.rerun.akkanotes.messaging.protocols

object TeacherProtocol{

  case class QuoteRequest()
  case class QuoteResponse(quoteString:String)

}

正如你所想的那样,QuoteRequest就是发送给TeacherActor的那个消息。Actor会回复一个QuoteResponse。

分发器及邮箱

ActorRef把消息处理功能委托给了Dispatcher。实际上,当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。我们来看下它们是干什么的。

图片

邮箱

每个Actor都有一个MailBox(后面会介绍一种特殊的情况)。在我们这个比喻当中,每个老师也有一个邮箱。老师得去检查邮箱并处理消息。在Actor的世界中,则是另一种形式——邮箱一有机会就会要求Actor去完成自己的任务。

同样的,邮箱里也有一个队列来以FIFO的方式来存储并处理消息——它和实际的邮箱还有点不同,真实的邮箱新的信总是在最上面的。

现在讲到分发器了

Dispatcher会完成一些很酷的事。从它的角度来看,它只是从ActorRef中取出一条消息然后将它传给了MailBox。但是,在这后面发生了一件不可意义的事情:

Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。

看下Dispatcher里面的一段代码:

protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {  
    ...
    try {
        executorService execute mbox
    ...
}

什么,你是说要执行一下邮箱?

是的。我们看到MailBox中包含了队列里面的消息。由于Executor得去执行MailBox,所以它得是一个Thread类型。是的没错。MailBox的声明及构造器就是这样的。

下面是MailBox的签名信息。

private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable

TeacherActor

图片

当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。

当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。

TeacherActor只是一个很简单的类,它有一个名言的列表,而receive方法很明显就是用来处理消息的。

来看下代码:

TeacherActor.scala

package me.rerun.akkanotes.messaging.actormsg1

import scala.util.Random

import akka.actor.Actor  
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._

/*
 * Your Teacher Actor class. 
 * 
 * The class could use refinement by way of  
 * using ActorLogging which uses the EventBus of the Actor framework
 * instead of the plain old System out
 * 
 */

class TeacherActor extends Actor {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  def receive = {

    case QuoteRequest => {

      import util.Random

      //Get a random Quote from the list and construct a response
      val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size)))

      println (quoteResponse)

    }

  }

}

TeacherActor的receive方法的模式匹配只会匹配一种消息——QuoteRequest (事实上,模式匹配中最好匹配下默认的情况,不过这个就说来话长了)

receive方法做的就是

  1. 匹配QuoteRequest的模式
  2. 从名言列表中随机选取一条
  3. 构造出一个QuoteResponse
  4. 将QuoteResponse打印到控制台上

代码

整个项目的代码可以从Github中下载到。

英文原文链接