Scala并发编程模型Akka

发布时间:2022-06-21 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了Scala并发编程模型Akka脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

[toc]

## Scala并发编程模型Akka

### 1. Akka简介

#### Akka是什么?

> 1. Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时,你可以理解成Akka是编写并发程序的框架。> 2. Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。> 3. Akka主要解决的问题是:可以轻松的写出高效稳定的并发程序,程序员不再过多的考虑线程、锁和资源竞争等细节。> 4. ![image-20210406221430939](assets/image-20210406221430939.png)

#### Actor 模型解决什么问题?

> 1. 处理并发问题关键是要保证共享数据的一致性和正确性,因为程序是多线程时,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。但是当我们对关键代码加入同步条件synchronized 后,实际上大并发就会阻塞在这段代码,对程序效率有很大影响。> 2. 若是用单线程处理,不会有数据一致性的问题,但是系统的性能又不能保证。> 3. Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。 你可以这里理解:`Actor 模型是一种处理并发问题的解决方案,很牛!`

### 2. Akka中Actor模型

#### Actor模型及其说明

> ![image-20210406221510708](assets/image-20210406221510708.png)>> 1. Akka 处理并发的方法基于 Actor 模型。(示意图)> 2. 在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。> 3. Actor 模型是作为一个并发模型设计和架构的。Actor与Actor 之间只能通过消息通信,如图的信封。> 4. Actor 与 Actor 之间只能用消息进行通信,当一个 Actor 给另外一个 Actor发消息,消息是有顺序的(消息队列),只需要将消息投寄的相应的邮箱即可。> 5. 怎么处理消息是由接收消息的Actor决定的,发送消息Actor可以等待回复,也可以异步处理【ajax】> 6. ActorSystem 的职责是负责创建并管理其创建的Actor, ActorSystem 是单例的(可以ActorSystem是一个工厂,专门创建Actor),一个 JVM 进程中有一个即可,而 Acotr 是可以有多个的。> 7. Actor模型是对并发模型进行了更高的抽象。> 8. Actor模型是`异步、非阻塞、高性能`的事件驱动编程模型。[异步、非阻塞, 最经典的案例:就是ajax异步请求处理 ]> 9. Actor模型是轻量级事件处理(1GB 内存可容纳百万级别个 Actor),因此处理大并发性能高。

#### Actor模型工作机制说明

![image-20210406222141148](assets/image-20210406222141148.png)

> `Actor模型工作机制说明(对照工作机制示意图理解)`>> 1. ActorySystem创建Actor> 2. `ActorRef:可以理解成是Actor的代理或者引用。`消息是通过ActorRef来发送,而不能通过Actor 发送消息,通过哪个ActorRef 发消息,就表示把该消息发给哪个Actor> 3. 消息发送到`Dispatcher Message (消息分发器)`,它得到消息后,会将消息进行分发到对应的MailBox。(注: Dispatcher Message 可以理解成是一个线程池, MailBox 可以理解成是消息队列,可以缓冲多个消息,遵守FIFO)> 4. Actor 可以通过 receive方法来获取消息,然后进行处理。>> `Actor间传递消息机制(对照工作机制示意图理解)`>> 1. 每一个消息就是一个Message对象。Message 继承了Runable, 因为Message就是线程类。> 2. 从Actor模型工作机制看上去很麻烦,但是程序员编程时只需要编写Actor就可以了,其它的交给Actor模型完成即可。> 3. `A Actor要给B Actor 发送消息,那么A Actor 要先拿到(也称为持有) B Actor 的 代理对象ActorRef 才能发送消息。`

### 3. Actor模型快速入门

> 1. 编写一个SayHelloActor> 2. SayHelloActor 可以给自己发送消息 > 3. 要求使用Maven的方式来构建项目,这样可以很好的解决项目开发包的依赖关系。

#### SayHelloActor

~~~~scalapackage com.atguigu.akka01.actor

import akka.actor.{Actor}/** * @Date 2021/4/6 14:13 * @Version 10.21 * @Author DuanChaojie * 继承Actor后,SayHelloActor就是一个Actor * 重写核心方法receive */class SayHelloActor extends Actor { /** * 1、receive方法,会被该SayHelloActor的MailBox调用 * 2、当该SayHelloActor的MailBox接收到消息,就会调用receive方法 * 3、type Receive = scala.PartialFunction[scala.Any, scala.Unit],即Receive表示偏函数接收的参数类型是Any,返回类型是Unit * 4、isDefinedAt(x: Any) 如果返回true ,就会去调用 apply 构建对象实例,如果是false,过滤 * * @return Receive */ override def receive: Receive = { case "Hello" => println("SayHelloActor:Hello tom") case "Ok" => println("SayHelloActor:Ok jack") case "exit" => { println("SayHelloActor:退出系统...") // 停止actoref /**

*/

/** 1、context.stop(xxx)阻止被指定的Actor,这是一个异步操作,即涉及一个消息发送。 * 2、如果此方法应用于来自参与者内部的"self"引用, * 则该参与者保证不会在此调用后处理任何其他消息; * 请注意,当前消息的处理将继续,此方法不会立即终止此参与者。 * 3、The 'self' field holds the ActorRef for this actor * 4、self可用于向自身发送消息,格式为:self ! message */ context.stop(self)

/** * Terminates this actor system */ context.system.terminate() } case _ => println("SayHelloActor:匹配失败!") }

}~~~~

#### SayHelloActorDemo

~~~~scalapackage com.atguigu.akka01.main

import akka.actor.{ActorRef, ActorSystem, Props}import com.atguigu.akka01.actor.SayHelloActor

import scala.io.StdInimport scala.util.control.Breaks.{break, breakable}

/** * @Date 2021/4/6 15:42 * @Version 10.21 * @Author DuanChaojie */object SayHelloActorDemo { // 1、创建一个ActorSystem,专门用于创建Actor private val actorFactory = ActorSystem("actorFactory")

/** * 创建SayHelloActor的同时,返回SayHelloActor的sayHelloActorRef * 1、Props[SayHelloActor] 通过反射创建一个SayHelloActor实例 * 2、给创建的Actor(SayHelloActor)取名为sayHelloActor * 3、sayHelloActorRef: ActorRef 就是 Props[SayHelloActor] 的ActorRef * 4、创建的SayHelloActor实例被ActorSystem接管 */ private val sayHelloActorRef: ActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")

def main(args: Array[String]): Unit = {

// 向sayHelloActorRef发送消息 breakable { while (true) { println(Console.GREEN + "请输入你想发的消息(提示输入完毕按回车):") val command = StdIn.readLine() sayHelloActorRef ! command Thread.sleep(1000) if (command == "exit") { break() } } }

}}~~~~

#### 效果如图:

![QQ截图20210406150212](assets/QQ截图20210406150212.png)

#### 小结和说明:

> 1. 当程序执行 `private val sayHelloActorRef: ActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")`,会完成如下任务 [这是非常重要的方法]> 2. actorFactory 是 ActorSystem("actorFactory")这样创建的。> 3. 这里的 Props[SayHelloActor]会使用反射机制,创建一个SayHelloActor对象,如果是`actorFactory.actorOf(Props(new SayHelloActor), "sayHelloActor") 形式`,就是使用new 的方式创建一个SayHelloActor对象, 注意Props() 是小括号。> 4. 会创建一个SayHelloActor对象的代理对象 sayHelloActorRef, 使用sayHelloActorRef才能发送消息> 5. 会在底层创建 Dispather Message ,是一个线程池,用于分发消息, 消息是发送到对应的Actor的 MailBox> 6. 会在底层创建SayHelloActor的MailBox 对象,该对象是一个队列,可接收Dispatcher Message 发送的消息> 7. MailBox 实现了Runnable 接口,是一个线程,一直运行并调用Actor的receive 方法,因此当Dispather 发送消息到MailBox时,Actor 在receive 方法就可以得到信息> 8. `sayHelloActorRef! "hello"`, 表示把hello消息发送到sayHelloActorRef的mailbox (通过Dispatcher Message 转发)

### 4. Actor模型应用实例-Actor间通讯

> 1. 编写2个 Actor , 分别是 DdActor和 MmActor> 2. DdActor和MmActor之间可以相互发送消息> 3. 加强对Actor传递消息机制的理解

#### DdActor

```scalapackage com.atguigu.akka02.actor

import akka.actor.{Actor, ActorRef}

/** * @Date 2021/4/6 15:38 * @Version 10.21 * @Author DuanChaojie */class DdActor(mmActorRef: ActorRef) extends Actor { val myMmActorRef = mmActorRef

override def receive: Receive = { case "Go!" => { println("3s后开启世界大战!") Thread.sleep(3000) myMmActorRef ! "DD" } case "MM" => { Thread.sleep(1000) println("MM:一起喵喵喵喵喵~") myMmActorRef ! "DD" } }}```

#### MmActor

~~~~scalapackage com.atguigu.akka02.actor

import akka.actor.Actor

/** * @Date 2021/4/6 15:38 * @Version 10.21 * @Author DuanChaojie */class MmActor extends Actor { override def receive: Receive = { case "DD" => { Thread.sleep(1000) println("DD:我们一起学猫叫~") sender() ! "MM" } }}~~~~

#### Main

```scalapackage com.atguigu.akka02.main

import akka.actor.{ActorRef, ActorSystem, Props}import com.atguigu.akka02.actor.{DdActor, MmActor}

import scala.io.StdIn

/** * @Date 2021/4/6 15:40 * @Version 10.21 * @Author DuanChaojie */object Main extends App { private val actorFactory = ActorSystem("actorFactory")

private val mmActorRef: ActorRef = actorFactory.actorOf(Props[MmActor], "mmActor") private val ddActorRef: ActorRef = actorFactory.actorOf(Props(new DdActor(mmActorRef)), "ddActor")

println(Console.GREEN + "请输入Start...")

val command = StdIn.readLine()

if (command.toLowerCase == "start") { ddActorRef ! "Go!" }}```

#### 效果图:

![image-20210406223923598](assets/image-20210406223923598.png)

#### 小结:

> 1. 两个Actor通讯机制和Actor 自身发消息机制基本一样,只是要注意如下> 2. 如果DdActor 在需要给MmActor 发消息,则需要持有MmActor 的 ActorRef,可以通过创建时,传入MmActor的 代理对象(ActorRef)> 3. 当MmActor 在receive 方法中接收到消息,需要回复时,可以通过sender() 获取到发送Actor的 代理对象。

### 5. Akka网络编程

> Akka支持面向大并发后端服务程序,网络通信这块是服务端程序重要的一部分。>> 网络编程有两种:>> 1. `TCP socket编程`,是网络编程的主流。之所以叫Tcp socket编程,是因为底层是基于Tcp/ip协议的. 比如: QQ聊天 [示意图]> 2. `B/S结构的Http编程`,我们使用浏览器去访问服务器时,使用的就是Http协议,而http底层依旧是用tcp socket实现的。 比如: 京东商城 【属于 web 开发范畴 】

#### 网络编程基础知识

##### TCP/IP模型

> 1. ![image-20210406225122074](assets/image-20210406225122074.png)>> >> 2. 深入理解:qq间相互通讯的案例> 3. ![image-20210406225227857](assets/image-20210406225227857.png)> 4. tracert的使用案例:> 5. ![image-20210406225335990](assets/image-20210406225335990.png)

##### IP地址

> 每个internet上的主机和路由器都有一个ip地址,它包括网络号和主机号,ip地址有ipv4(32位)或者ipv6(128位). 可以通过ipconfig 来查看

##### 端口port

> 1. > 我们这里所指的端口不是指物理意义上的端口,而是特指TCP/IP协议中的端口,是逻辑意义上的端口。> 2. 如果把IP地址比作一间房子,端口就是出入这间房子的门。真正的房子只有几个门,但是一个IP地址的端口可以有`65535`(即:`256×256-1`)个之多!端口是通过端口号来标记的。(端口号 0:Reserved)> 3. ==端口port的分类==> - 0号是保留端口> - 1-1024是固定端口,又叫有名端口,即被某些程序固定使用,一般程序员不使用> - 22: SSH远程登录协议 > - 23: telnet使用 > - 21: ftp使用 > - 25: smtp服务使用 > - 80: iis使用 > - 7: echo服务> - 1025-65535是动态端口,这些端口,程序员可以使用。> 4. 端口port使用注意> 1. 在计算机(尤其是做服务器)要尽可能的少开端口> 2. 一个端口只能被一个程序监听( )> 3. 如果使用 netstat –an 可以查看本机有哪些端口在监听> 4. 可以使用 netstat –anb 来查看监听端口的pid,在结合任务管理器关闭不安全的端口。

#### Akka网络编程-小黄鸡客服案例

> 需求分析:>> 1. 服务端进行监听(8888)> 2. 客户端可以通过键盘输入,发送咨询问题给小黄鸡客服(服务端)> 3. 小黄鸡(服务端) 回答客户的问题

##### 服务端--ServerMain

```scalapackage com.atguigu.akka03.server

import akka.actor.{ActorRef, ActorSystem, Props}import com.typesafe.config.ConfigFactory

/** * @Date 2021/4/6 16:18 * @Version 10.21 * @Author DuanChaojie */object ServerMain extends App{ val serverHost = "127.0.0.1" val serverPort = 8888

/** * 对于此字符串中的每一行: * 从行中删除由空格或控制字符('|')组成的前导前缀。 * 创建config对象,指定协议类型,监听的ip和端口 */ val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$serverHost |akka.remote.netty.tcp.port=$serverPort """.stripMargin)

private val serverActorFactory = ActorSystem("serverActorFactory",config)

private val yellowChickenServerRef: ActorRef = serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")

// ServerMain启动 yellowChickenServerRef ! "start".toLowerCase}```

##### 服务端--YellowChickenServer

```scalapackage com.atguigu.akka03.server

import akka.actor.Actorimport com.atguigu.akka03.common.{ClientMessage, ServerMessage}

/** * @Date 2021/4/6 16:17 * @Version 10.21 * @Author DuanChaojie */class YellowChickenServer extends Actor { override def receive: Receive = { case "start" => println(Console.BLUE + "YellowChickenServer已经启动....") case ClientMessage(msg) => { // TODO match模糊匹配 msg match { case "java" => sender() ! ServerMessage("Java 是由 Sun Microsystems 公司于 1995 年 5 月推出的高级程序设计语言。") case "javascript" => sender() ! ServerMessage("JavaScript在1995年由Netscape公司的Brendan Eich,在网景导航者浏览器上首次设计实现而成。因为Netscape与Sun合作,Netscape管理层希望它外观看起来像Java,因此取名为JavaScript。但实际上它的语法风格与Self及Scheme较为接近。") case "大数据" => sender() ! ServerMessage("大数据(big data),IT行业术语,是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。") case "作者" => sender() ! ServerMessage("https://blog.csdn.net/weixin_45267102/article/details/111472987") case _ => println("Nothing~") } } }}```

##### 客户端--ClientMain

```scalapackage com.atguigu.akka03.client

import akka.actor.{ActorRef, ActorSystem, Props}import com.atguigu.akka01.main.SayHelloActorDemo.sayHelloActorRefimport com.typesafe.config.ConfigFactory

import scala.io.StdInimport scala.util.control.Breaks.{break, breakable}

/** * @Date 2021/4/6 16:18 * @Version 10.21 * @Author DuanChaojie */object ClientMain extends App { val (clientHost, clientPort, serverHost, serverPort) = ("127.0.0.1", 6666, "127.0.0.1", 8888)

/** * 对于此字符串中的每一行: * 从行中删除由空格或控制字符('|')组成的前导前缀。 * 创建config对象,指定协议类型,监听的ip和端口 */ val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$clientHost |akka.remote.netty.tcp.port=$clientPort """.stripMargin) val clientActorFactory = ActorSystem("clientActorFactory", config) val yellowChickenClientRef: ActorRef = clientActorFactory.actorOf(Props(new YellowChickenClient(serverHost, serverPort)), "yellowChickenClient")

// ClientMain启动 yellowChickenClientRef ! "start".toLowerCase

// 向yellowChickenClientRef发送消息 breakable { while (true) { Thread.sleep(1000) println(Console.GREEN + "请输入你咨询的问题(提示输入完毕按回车):") val command = StdIn.readLine() yellowChickenClientRef ! command if (command == "exit") { break() } } }}```

##### 客户端--YellowChickenClient

```scalapackage com.atguigu.akka03.client

import akka.actor.{Actor, ActorSelection}import com.atguigu.akka03.common.{ClientMessage, ServerMessage}

/** * @Date 2021/4/6 16:18 * @Version 10.21 * @Author DuanChaojie */class YellowChickenClient(serverHost: String, serverPort: Int) extends Actor {

var yellowChickenServerRef: ActorSelection = _

/** * 1、在Actor中有一个方法PreStart方法,他会在actor运行前执行 * 2、在akka的开发中,通常将初始化的工作,放在preStart方法 */ override def preStart(): Unit = { /** 注意: * serverActorFactory 是server端 ActorSystem("serverActorFactory",config) * user/后面是 serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer") */ yellowChickenServerRef = context.actorSelection(s"akka.tcp://serverActorFactory@${serverHost}:${serverPort}/user/yellowChickenServer") println(yellowChickenServerRef) }

override def receive: Receive = { case "start" => println(Console.BLUE + "YellowChickenClient已经启动....") // 将咨询的问题发送到Server端 case msg: String => { yellowChickenServerRef ! ClientMessage(msg.toLowerCase) } case ServerMessage(msg) => { println(s"YellowChickenServer: $msg") } }

}```

##### MessageProtocol

```scala/** * @Date 2021/4/6 16:19 * @Version 10.21 * @Author DuanChaojie */class MessageProtocol {}/** * 使用样例类来构建协议 * 客户端发给服务器协议(序列化的对象) * @param mes */case class ClientMessage(mes: String)

/** * 服务端发给客户端的协议(样例类对象) * @param mes */case class ServerMessage(mes: String)```

##### 项目结构图:

![image-20210406230448869](assets/image-20210406230448869.png)

##### 效果图:

![image-20210406230528305](assets/image-20210406230528305.png)

![image-20210406230612163](assets/image-20210406230612163.png)

### 6. Spark Master Worker 进程通讯项目

#### 项目意义:

> 1. 深入理解Spark的Master和Worker的通讯机制> 2. 为了方便同学们看Spark的底层源码,命名的方式和源码几乎保持一致(如: 通讯消息类命名就是一样的)> 3. ==加深对主从服务心跳检测机制(HeartBeat)的理解==,方便以后spark源码二次开发。

#### 项目需求分析:

![image-20210406230859899](assets/image-20210406230859899.png)

> 1. worker注册到Master, Master完成注册,并回复worker注册成功> 2. worker定时发送心跳,并在Master接收到> 3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间> 4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从workers(HashMap)中删除> 5. master worker 进行分布式部署(Linux系统)

#### 功能实现:

##### SparkMaster

```scalapackage com.atguigu.spark.master

import akka.actor.{Actor, ActorSystem, Props}import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, RemoveTimeOutWorker, StartTimeOutWorker, WorkerInfo}import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._import scala.collection.mutable

/** * @Date 2021/4/6 18:35 * @Version 10.21 * @Author DuanChaojie */class SparkMaster extends Actor {

/** * SparkMaster维护一个SparkWorker的mutable.map(id,WorkerInfo(id, cpu, ram)) */ val workers = mutable.Map[String, WorkerInfo]()

override def receive: Receive = { case "start" => { println(Console.BLUE + "SparkMaster启动了...") self ! StartTimeOutWorker } case RegisterWorkerInfo(id, cpu, ram) => { // SparkMaster处理注册信息 if (!workers.contains(id)) { val workerInfo = new WorkerInfo(id, cpu, ram) // 将wokerInfo加入到workers中 workers += (id -> workerInfo) // 告诉SparkWorker注册成功 sender() ! RegisteredWorkerInfo } println("workers = " + workers) } case HeartBeat(id) => { val workerInfo = workers(id) workerInfo.lastHeartBeat = System.currentTimeMillis() println("SparkMaster更新了id = " + id + "的心跳时间为:" + workerInfo.lastHeartBeat) } case StartTimeOutWorker => { import context.dispatcher context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker) } case RemoveTimeOutWorker => { //拿到所有的workerInfo val workerInfos = workers.values val deadWorkerInfos = workerInfos.filter(workerInfo => (System.currentTimeMillis() - workerInfo.lastHeartBeat) > 6000) deadWorkerInfos.foreach(workerInfo => workers.remove(workerInfo.id)) println(s"当前有${workers.size}个sparkWorker存活~") } }}

object SparkMaster { /** * @param args (0) serverHost args(1) serverPort */ def main(args: Array[String]): Unit = {

val masterHost = args(0) val masterPort = args(1).toInt

/** 创建config对象,指定协议类型,监听的ip和端口 */ val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$masterHost |akka.remote.netty.tcp.port=$masterPort """.stripMargin)

val sparkMasterSystem = ActorSystem("sparkMasterSystem", config) val sparkMaster = sparkMasterSystem.actorOf(Props[SparkMaster], "sparkMaster")

sparkMaster ! "start" }}```

##### SparkWorker

```scalapackage com.atguigu.spark.worker

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, SendHeartBeat}import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._

/** * @Date 2021/4/6 18:36 * @Version 10.21 * @Author DuanChaojie */class SparkWorker(masterHost: String, masterPort: Int) extends Actor {

var sparkMaster: ActorSelection = _

override def preStart(): Unit = { sparkMaster = context.actorSelection(s"akka.tcp://sparkMasterSystem@${masterHost}:${masterPort}/user/sparkMaster") }

// 使用UUID生成sparkWorker的id private val id: String = java.util.UUID.randomUUID().toString

override def receive: Receive = { case "start" => { println(Console.BLUE + "SparkMaster启动了...") println("正在向SparkMaster注册自己...") //RegisterWorkerInfo(id: String, cpu: Int, ram: Int) sparkMaster ! RegisterWorkerInfo(id, 4, 256 * 1024) } case RegisteredWorkerInfo => { println("已经向SparkMaster注册成功!")

import context.dispatcher context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat) }

case SendHeartBeat => { println("给SparkMaster发送心跳~") sparkMaster ! HeartBeat(id) } }}

object SparkWorker { /** * @param args * workerHost args(0) * workerPort args(1) * masterHost args(2) * masterPort args(3) */ def main(args: Array[String]): Unit = {

val (workerHost, workerPort, masterHost, masterPort) = (args(0),args(1).toInt,args(2),args(3).toInt) /** 创建config对象,指定协议类型,监听的ip和端口 */ val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$workerHost |akka.remote.netty.tcp.port=$workerPort """.stripMargin) val sparkWorkerSystem = ActorSystem("sparkWorkerSystem", config) val sparkWorker = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "sparkWorker")

sparkWorker ! "start" }}```

##### MessageProtocol

~~~~scalapackage com.atguigu.spark.common

/** * @Date 2021/4/6 18:37 * @Version 10.21 * @Author DuanChaojie * MessageProtocol.scala */class MessageProtocol {}

/** * worker注册信息 * @param id * @param cpu * @param ram */case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)

/** * 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker) * 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间) * @param id * @param cpu * @param ram */class WorkerInfo(val id: String, val cpu: Int, val ram: Int) { var lastHeartBeat : Long = System.currentTimeMillis()}

/** * 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象 */case object RegisteredWorkerInfo

/** * worker每隔一定时间由定时器发给自己的一个消息 */case object SendHeartBeat

/** * worker每隔一定时间由定时器触发,而向master发现的协议消息 * @param id */case class HeartBeat(id: String)

/** * master给自己发送一个触发检查超时worker的信息 */case object StartTimeOutWorker

/** * master给自己发消息,检测worker,对于心跳超时的 */case object RemoveTimeOutWorker~~~~

##### 项目结构图:

![image-20210406231338055](assets/image-20210406231338055.png)

##### 项目效果图:

![image-20210406231424443](assets/image-20210406231424443.png)

![image-20210406231527269](assets/image-20210406231527269.png)

![image-20210406231540071](assets/image-20210406231540071.png)、

##### 进行分布式部署(Linux系统)

> 打包

![image-20210406231833163](assets/image-20210406231833163.png)

> 更名后上传到服务器(linux)上:

~~~SHELL#依次执行以下命令java -jar SparkMaster.jar 127.0.0.1 7777

java -jar SparkWorker.jar 127.0.0.1 6666 127.0.0.1 7777java -jar SparkWorker.jar 127.0.0.1 5555 127.0.0.1 7777java -jar SparkWorker.jar 127.0.0.1 4444 127.0.0.1 7777~~~

> 效果图如下:

![image-20210406224411623](assets/image-20210406224411623.png)

![image-20210406224437635](assets/image-20210406224437635.png)

![image-20210406224501292](assets/image-20210406224501292.png)

![image-20210406224530020](assets/image-20210406224530020.png)

## ☆

脚本宝典总结

以上是脚本宝典为你收集整理的Scala并发编程模型Akka全部内容,希望文章能够帮你解决Scala并发编程模型Akka所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签:并发