描述

akka的编程模型 本身是不建议等待返回结果的
通过给 actor加状态给任务计数 + 消息的回传 达到获取结果的目的

该文演示了此操作方式

版本1

该版本是通过异步回调的方式获得结果

消息类

1
2
3
4
case object StartJob

case class DownloadMsg(str: String)
case class NodeTaskDone(savePath: String)

执行文件下载的actor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package fluffy.mo.akkaCase.downloadFile01

import java.util.concurrent.TimeUnit

import akka.actor.Actor

class FileDownloaActor extends Actor {
  def receive = {
    case DownloadMsg(url) => {
      println("下载文件")
      TimeUnit.MICROSECONDS.sleep(300)
      sender ! NodeTaskDone("下载完成--返回存储位置")
    }
    case x  => println("FileDownloaActor-- unkown msg-- " + x)
  }
}

任务分发的actor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package fluffy.mo.akkaCase.downloadFile01

import akka.actor.{Actor, ActorRef, Props}

import scala.collection.mutable.ArrayBuffer

class DownloadJob(urls: Array[String]) extends Actor {

  private var running = false
  private var taskNum = 0
  private var downloadNum = 0
  private val result = ArrayBuffer[String]()
  private var taskSender: Option[ActorRef] = None

  def receive = {
    // 启动任务
    case StartJob => {
      if (running) {
        throw new Exception()
      }

      running = true
      taskSender = Some(sender)

      for (url <- urls) {
        context.actorOf(Props[FileDownloaActor]) ! DownloadMsg(url)
        taskNum += 1
      }
    }
    // child-actor 完成任务回调通知
    case NodeTaskDone(savePath) => {
      result += savePath
      downloadNum += 1
      if (downloadNum == taskNum) {
        // 通知任务发起人任务结束
        taskSender.map(_ ! result)
      }
    }
    case x => println("DownloadJob-- unkown msg-- " + x)
  }
}

启动任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package fluffy.mo.akkaCase.downloadFile01

import akka.actor.{ActorSystem, Props}

object AppRun extends App {
  import akka.util.Timeout
  import scala.concurrent.duration._
  import akka.pattern.ask

  implicit val ec = akka.dispatch.ExecutionContexts.global

  val system = ActorSystem("System")
  val array = Array("http://file.com/aa.txt","http://file.com/bb.txt","http://file.com/cc.txt")

  val actor = system.actorOf(Props(classOf[DownloadJob], array))

  implicit val timeout = Timeout(25 seconds)
  val future = actor ? StartJob

  future.map { result =>
    println("文件下载完成 " + result)
    system.terminate()
  }
}

版本2

该版本是通过线程阻塞的方式等待结果

消息类

通过 CountDownLatch 完成线程的阻塞与唤醒

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
case object StartJob

case class DownloadMsg(str: String)

case class NodeTaskDone(savePath: String)

class JobBox(turls: Array[String]) {
  val urls = turls
  val countDownLatch = new CountDownLatch(1)
  var data: Array[String] = null

  def waitingForResponse() = {
    countDownLatch.await()
    data
  }

  def done() = countDownLatch.countDown();
}

执行文件下载的actor

同版本1 无变化

任务分发的actor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package fluffy.mo.akkaCase.downloadFile02

import akka.actor.{Actor, Props}

import scala.collection.mutable.ArrayBuffer

class DownloadJob(box: JobBox) extends Actor {

  private var running = false
  private var taskNum = 0
  private var downloadNum = 0
  private val result = ArrayBuffer[String]()

  def receive = {
    // 启动任务
    case StartJob => {
      if (running) {
        throw new Exception()
      }

      running = true

      for (url <- box.urls) {
        context.actorOf(Props[FileDownloaActor]) ! DownloadMsg(url)
        taskNum += 1
      }
    }
    // child-actor 完成任务回调通知
    case NodeTaskDone(savePath) => {
      result += savePath
      downloadNum += 1
      if (downloadNum == taskNum) {
        // 通知任务发起人任务结束
        box.data = result.toArray
        box.done();
      }
    }
    case x => println("DownloadJob-- unkown msg-- " + x)
  }
}

启动任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package fluffy.mo.akkaCase.downloadFile02

import akka.actor.{ActorSystem, Props}

object AppRun extends App {

  implicit val ec = akka.dispatch.ExecutionContexts.global

  val system = ActorSystem("System")
  val array = Array("http://file.com/aa.txt", "http://file.com/bb.txt", "http://file.com/cc.txt")
  val box = new JobBox(array)
  system.actorOf(Props(classOf[DownloadJob], box)) ! StartJob

  val savePaths: Array[String] = box.waitingForResponse()
  println("文件下载完成 " + savePaths.length)
  system.terminate()
}