描述
akka的编程模型 本身是不建议等待返回结果的
通过给 actor加状态给任务计数 + 消息的回传
达到获取结果的目的
该文演示了此操作方式
版本1
该版本是通过异步回调的方式获得结果
消息类
1
2
3
4
|
case object StartJob
case class DownloadMsg(str: String)
case class NodeTaskDone(savePath: String)
|
执行文件下载的actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
package fluffy.mo.akkaCase.downloadFile01
import java.util.concurrent.TimeUnit
import akka.actor.Actor
class FileDownloaActor extends Actor {
def receive = {
case DownloadMsg(url) => {
println("下载文件")
TimeUnit.MICROSECONDS.sleep(300)
sender ! NodeTaskDone("下载完成--返回存储位置")
}
case x => println("FileDownloaActor-- unkown msg-- " + x)
}
}
|
任务分发的actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package fluffy.mo.akkaCase.downloadFile01
import akka.actor.{Actor, ActorRef, Props}
import scala.collection.mutable.ArrayBuffer
class DownloadJob(urls: Array[String]) extends Actor {
private var running = false
private var taskNum = 0
private var downloadNum = 0
private val result = ArrayBuffer[String]()
private var taskSender: Option[ActorRef] = None
def receive = {
// 启动任务
case StartJob => {
if (running) {
throw new Exception()
}
running = true
taskSender = Some(sender)
for (url <- urls) {
context.actorOf(Props[FileDownloaActor]) ! DownloadMsg(url)
taskNum += 1
}
}
// child-actor 完成任务回调通知
case NodeTaskDone(savePath) => {
result += savePath
downloadNum += 1
if (downloadNum == taskNum) {
// 通知任务发起人任务结束
taskSender.map(_ ! result)
}
}
case x => println("DownloadJob-- unkown msg-- " + x)
}
}
|
启动任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package fluffy.mo.akkaCase.downloadFile01
import akka.actor.{ActorSystem, Props}
object AppRun extends App {
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
implicit val ec = akka.dispatch.ExecutionContexts.global
val system = ActorSystem("System")
val array = Array("http://file.com/aa.txt","http://file.com/bb.txt","http://file.com/cc.txt")
val actor = system.actorOf(Props(classOf[DownloadJob], array))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartJob
future.map { result =>
println("文件下载完成 " + result)
system.terminate()
}
}
|
版本2
该版本是通过线程阻塞的方式等待结果
消息类
通过 CountDownLatch 完成线程的阻塞与唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
case object StartJob
case class DownloadMsg(str: String)
case class NodeTaskDone(savePath: String)
class JobBox(turls: Array[String]) {
val urls = turls
val countDownLatch = new CountDownLatch(1)
var data: Array[String] = null
def waitingForResponse() = {
countDownLatch.await()
data
}
def done() = countDownLatch.countDown();
}
|
执行文件下载的actor
同版本1 无变化
任务分发的actor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package fluffy.mo.akkaCase.downloadFile02
import akka.actor.{Actor, Props}
import scala.collection.mutable.ArrayBuffer
class DownloadJob(box: JobBox) extends Actor {
private var running = false
private var taskNum = 0
private var downloadNum = 0
private val result = ArrayBuffer[String]()
def receive = {
// 启动任务
case StartJob => {
if (running) {
throw new Exception()
}
running = true
for (url <- box.urls) {
context.actorOf(Props[FileDownloaActor]) ! DownloadMsg(url)
taskNum += 1
}
}
// child-actor 完成任务回调通知
case NodeTaskDone(savePath) => {
result += savePath
downloadNum += 1
if (downloadNum == taskNum) {
// 通知任务发起人任务结束
box.data = result.toArray
box.done();
}
}
case x => println("DownloadJob-- unkown msg-- " + x)
}
}
|
启动任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package fluffy.mo.akkaCase.downloadFile02
import akka.actor.{ActorSystem, Props}
object AppRun extends App {
implicit val ec = akka.dispatch.ExecutionContexts.global
val system = ActorSystem("System")
val array = Array("http://file.com/aa.txt", "http://file.com/bb.txt", "http://file.com/cc.txt")
val box = new JobBox(array)
system.actorOf(Props(classOf[DownloadJob], box)) ! StartJob
val savePaths: Array[String] = box.waitingForResponse()
println("文件下载完成 " + savePaths.length)
system.terminate()
}
|
文章作者
duansheli
上次更新
2019-12-25
(325c7b3)