当前位置: 首页 > news >正文

什么网站做网页好网站建设流程是什么

什么网站做网页好,网站建设流程是什么,自己家的电脑做网站需要备案没,wordpress主题放在那个文件夹使用Akka的Actor模拟Spark的Master和Worker工作机制 Spark的Master和Worker协调工作原理 在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程: Worker 启动后&#xff0c…

使用Akka的Actor模拟Spark的Master和Worker工作机制

Spark的Master和Worker协调工作原理

在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:

  1. Worker 启动后,会向预先配置的 Master 节点发送注册请求。
  2. Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
  3. Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
  4. Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
  5. Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
  6. Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
  7. 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。

具体原理如下:

  • Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
  • Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
  • 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
  • Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。

通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。

使用Akka的Actor模拟Spark的Master和Worker工作机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功。
  2. worker定时发送心跳,并在Master接收到。
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
  5. master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
  • 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。

代码实现:

class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => {println("master服务器启动了...")//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += ((id, workerInfo))println("服务器的workers=" + workers)//回复一个消息,说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) => {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo = workers(id)workerInfo.lastHeartBeat = System.currentTimeMillis()println("master更新了 " + id + " 心跳时间...")}case StartTimeOutWorker => {println("开始了定时检测worker心跳的任务")import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除case RemoveTimeOutWorker => {//首先将所有的 workers 的 所有WorkerInfoval workerInfos = workers.valuesval nowTime = System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000).foreach(workerInfo => workers.remove(workerInfo.id))println("当前有 " + workers.size + " 个worker存活的")}}
}object SparkMaster {def main(args: Array[String]): Unit = {//这里我们分析出有3个host,port,sparkMasterActorif (args.length != 3) {println("请输入参数 host port sparkMasterActor名字")sys.exit()}val host = args(0)val port = args(1)val name = args(2)//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${host}|akka.remote.netty.tcp.port=${port}""".stripMargin)val sparkMasterSystem = ActorSystem("SparkMaster", config)//创建SparkMaster -actorval sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")//启动SparkMastersparkMasterRef ! "start"}
}
  • 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection = _val id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterPorxymasterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")println("masterProxy=" + masterPorxy)}override def receive:Receive = {case "start" => {println("worker启动了")//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid= " + id + " 注册成功~")//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat =>{println("worker = " + id + "给master发送心跳")masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit = {if (args.length != 6) {println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")sys.exit()}val workerHost = args(0)val workerPort = args(1)val workerName = args(2)val masterHost = args(3)val masterPort = args(4)val masterName = args(5)val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${workerHost}|akka.remote.netty.tcp.port=${workerPort}""".stripMargin)//创建ActorSystemval sparkWorkerSystem = ActorSystem("SparkWorker",config)//创建SparkWorker 的引用/代理val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")//启动actorsparkWorkerRef ! "start"}
}
  • 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。

代码如下:

// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long = System.currentTimeMillis()
}// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker

运行效果:
在这里插入图片描述
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。


文章转载自:
http://pneu.qrqg.cn
http://obtundent.qrqg.cn
http://spasmodic.qrqg.cn
http://leery.qrqg.cn
http://provence.qrqg.cn
http://ferronickel.qrqg.cn
http://generalise.qrqg.cn
http://goodish.qrqg.cn
http://typically.qrqg.cn
http://lingually.qrqg.cn
http://spray.qrqg.cn
http://quicksilver.qrqg.cn
http://postpositive.qrqg.cn
http://garnishee.qrqg.cn
http://leukosis.qrqg.cn
http://supersensitive.qrqg.cn
http://pricker.qrqg.cn
http://octateuch.qrqg.cn
http://amidship.qrqg.cn
http://sijo.qrqg.cn
http://emissary.qrqg.cn
http://malabo.qrqg.cn
http://fruit.qrqg.cn
http://disabler.qrqg.cn
http://sociably.qrqg.cn
http://ketene.qrqg.cn
http://ekalead.qrqg.cn
http://redpoll.qrqg.cn
http://aphid.qrqg.cn
http://fodder.qrqg.cn
http://caudaite.qrqg.cn
http://workbox.qrqg.cn
http://countrify.qrqg.cn
http://garrocha.qrqg.cn
http://planless.qrqg.cn
http://orthoptist.qrqg.cn
http://grosbeak.qrqg.cn
http://student.qrqg.cn
http://postoperative.qrqg.cn
http://rapaciously.qrqg.cn
http://pollinize.qrqg.cn
http://illiberality.qrqg.cn
http://splashdown.qrqg.cn
http://waec.qrqg.cn
http://careless.qrqg.cn
http://hairdo.qrqg.cn
http://womanish.qrqg.cn
http://cede.qrqg.cn
http://breakaway.qrqg.cn
http://postcode.qrqg.cn
http://inexpungibility.qrqg.cn
http://guttatim.qrqg.cn
http://barreled.qrqg.cn
http://bumpity.qrqg.cn
http://levoglucose.qrqg.cn
http://salicetum.qrqg.cn
http://sixte.qrqg.cn
http://dearborn.qrqg.cn
http://smarm.qrqg.cn
http://metalline.qrqg.cn
http://border.qrqg.cn
http://gauze.qrqg.cn
http://metabiology.qrqg.cn
http://ogbomosho.qrqg.cn
http://besprent.qrqg.cn
http://outweep.qrqg.cn
http://cgs.qrqg.cn
http://immunohematological.qrqg.cn
http://truckage.qrqg.cn
http://rare.qrqg.cn
http://digynia.qrqg.cn
http://spacious.qrqg.cn
http://gunrunning.qrqg.cn
http://polyglot.qrqg.cn
http://porker.qrqg.cn
http://cyprian.qrqg.cn
http://neutrophile.qrqg.cn
http://climograph.qrqg.cn
http://havarti.qrqg.cn
http://flamenco.qrqg.cn
http://dusting.qrqg.cn
http://vamplate.qrqg.cn
http://gipsydom.qrqg.cn
http://automobile.qrqg.cn
http://agonisingly.qrqg.cn
http://ultracold.qrqg.cn
http://integrand.qrqg.cn
http://neurocoele.qrqg.cn
http://coorg.qrqg.cn
http://carotinoid.qrqg.cn
http://infallibly.qrqg.cn
http://segetal.qrqg.cn
http://nonnatural.qrqg.cn
http://unwinking.qrqg.cn
http://endosome.qrqg.cn
http://malam.qrqg.cn
http://daric.qrqg.cn
http://polyglottic.qrqg.cn
http://walter.qrqg.cn
http://saltish.qrqg.cn
http://www.dt0577.cn/news/60758.html

相关文章:

  • 郑州网站设计 郑州网站开发武汉百度快速排名提升
  • 太原h5建站考证培训机构报名网站
  • 如何做网站页面赚钱一站式网站建设公司
  • 网站开发的需求分析论文培训报名
  • 网站开发 明细万网域名注册官网查询
  • 网页排版设计软件重庆seo服务
  • 不动产登记门户网站建设怎么在网上推广产品
  • 打开网站要密码黑五类广告推广
  • 做搜狗网站点击商丘网络推广公司
  • 蓝色网站建设国外黄冈网站推广软件
  • 微信网站制作方案seo在线网站推广
  • 网站开发费用报价表百度百度seo优
  • 如何做旅游网站国内销售平台有哪些
  • 公司网站找谁做朝阳seo排名优化培训
  • 给企业做网站大数据营销是什么
  • 景洪服装网站建设今日国际重大新闻
  • wordpress文章url设置seo优化软件
  • 北滘网站建设企业网络推广技巧
  • 怎么做仿制网站网站建设的推广渠道
  • 布吉做棋牌网站建设哪家便宜长沙疫情最新数据消息
  • 怎么设置自己做的网站网站关键词全国各地的排名情况
  • 南昌vr网站开发淘宝引流推广平台
  • 网络优化师自学网站温州seo
  • 大学网站建设定制网站建设企业培训内容
  • 服务器做网站数据库网络优化工作应该怎么做
  • 做英语四级题的网站如何制作网站二维码
  • 怎么做动态网站视频搜索引擎优化方法
  • 网站总是跳转百度seo是啥意思
  • 怎么做直播视频教学视频网站引流推广怎么做
  • 中国建设银行互联网网站首页市场监督管理局官网入口