Scala-Akka 实例 Akka 实例
18.1需求分析
实现一个分布式模型,Master 保持所有 Worker 节点的信息,根据
Worker 的心跳信息维持与 Worker 的连接,Worker 启动时向 Master 节点进行
注册,Master 节点回复 ACK 信息。
18.2项目源代码
18.2.1 新建 Maven 项目 AkkaSystem
pom.xml 文件如下:
18.2.2 WorkInfo 类抽象
class WorkerInfo(val id : String, val workerHost : String, val memory : String, val cores : String) { var lastHeartbeat : Long = System.currentTimeMillis() override def toString = s"WorkerInfo($id, $workerHost, $memory, $cores)" }
18.2.3 ActorMessage
case class RegisterWorker(val id : String, val workerHost : String, val memory : String, val cores : String) case class HeartBeat(val workid : String) case class CheckOfTimeOutWorker() case class RegisteredWorker(val workerHost : String) case class SendHeartBeat()
18.2.4 Master
import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global class Master extends Actor{ //保存 WorkerID 和 Work 信息的 map val idToWorker = new mutable.HashMap[String, WorkerInfo] //保存所有 Worker 信息的 Set val workers = new mutable.HashSet[WorkerInfo] //Worker 超时时间 val WORKER_TIMEOUT = 10 * 1000 //构造方法执行完执行一次 override def preStart(): Unit = { //启动定时器,定时执行 context.system.scheduler.schedule(5 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker) } //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息 override def receive: Receive = { //Worker 向 Master 发送的注册消息 case RegisterWorker(id, workerHost, memory, cores) => { if(!idToWorker.contains(id)) { val worker = new WorkerInfo(id, workerHost, memory, cores) workers.add(worker) idToWorker(id) = worker println("new register worker: "+worker) sender ! RegisteredWorker(worker.id) } } //Worker 向 Master 发送的心跳消息 case HeartBeat(workerId) => { val workerInfo = idToWorker(workerId) println("get heartbeat message from: "+workerInfo) workerInfo.lastHeartbeat = System.currentTimeMillis() } //Master 自己向自己发送的定期检查超时 Worker 的消息 case CheckOfTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray for(worker <- toRemove){ workers -= worker idToWorker.remove(worker.id) } println("worker size: " + workers.size) } } } object Master { //程序执行入口 def main(args: Array[String]) { val host = "localhost" val port = 8888 //创建 ActorSystem 的必要参数 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem 是单例的,用来创建 Actor val actorSystem = ActorSystem.create("MasterActorSystem", config) //启动 Actor,Master 会被实例化,生命周期方法会被调用 actorSystem.actorOf(Props[Master], "Master") } }
18.2.5 Worker
import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global class Worker extends Actor{ //Worker 端持有 Master 端的引用(代理对象) var master: ActorSelection = null //生成一个 UUID,作为 Worker 的标识 val id = UUID.randomUUID().toString //构造方法执行完执行一次 override def preStart(): Unit = { //Worker 向 MasterActorSystem 发送建立连接请求 master = context.system.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/use r/Master") //Worker 向 Master 发送注册消息 master ! RegisterWorker(id, "localhost", "10240", "8") } //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息 override def receive: Receive = { //Master 向 Worker 的反馈信息 case RegisteredWorker(masterUrl) => { //启动定时任务,向 Master 发送心跳 context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat) } case SendHeartBeat => { println("worker send heartbeat") master ! HeartBeat(id) } } } object Worker { def main(args: Array[String]) { val clientPort = 8889 //创建 WorkerActorSystem 的必要参数 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.port = $clientPort """.stripMargin val config = ConfigFactory.parseString(configStr) val actorSystem = ActorSystem("WorkerActorSystem", config) //启动 Actor,Master 会被实例化,生命周期方法会被调用 actorSystem.actorOf(Props[Worker], "Worker") } }
18.3项目运行
Master:
Worker: