当前位置: 首页 > news >正文

关于网站建设的教材网站备案是什么意思

关于网站建设的教材,网站备案是什么意思,四站合一网站制作,汕头网站排名优化零、本讲学习目标 了解RDD容错机制理解RDD检查点机制的特点与用处理解共享变量的类别、特点与使用 一、RDD容错机制 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式&#xff0c…

零、本讲学习目标

  1. 了解RDD容错机制
  2. 理解RDD检查点机制的特点与用处
  3. 理解共享变量的类别、特点与使用

一、RDD容错机制

  • 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式设置检查点(checkpoint)方式

(一)血统方式

  • 根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。

(二)设置检查点方式

  • 本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

二、RDD检查点

(一)RDD检查点机制

  • RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。

(二)与RDD持久化的区别

  • cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。
  • 在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

(三)RDD检查点案例演示

  • net.cl.rdd包里创建CheckpointDemo对象
package net.cl.rddimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object CheckpointDemo {def main(args: Array[String]): Unit = {// 设置系统属性(本地运行必须设置,否则无权访问HDFS)System.setProperty("HADOOP_USER_NAME", "root")// 创建SparkConf对象val conf = new SparkConf()// 设置应用程序名称,可在Spark WebUI里显示conf.setAppName("Spark-CheckpointDemo")// 设置集群Master节点访问地址conf.setMaster("local[2]")// 设置测试内存conf.set("spark.testing.memory", "2147480000")// 基于SparkConf对象创建SparkContext对象,该对象是提交Spark应用程序的入口val sc = new SparkContext(conf)// 设置检查点数据存储路径sc.setCheckpointDir("hdfs://master:9000/spark-ck")// 创建模拟数据RDDval rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))// 过滤结果val resultRDD = rdd.filter(_ >= 5)// 持久化RDD到内存中resultRDD.cache()// 将resultRDD标记为检查点resultRDD.checkpoint()// 第一次行动算子计算时,将把标记为检查点的RDD数据存储到文件系统指定路径中val result: String = resultRDD.collect().mkString(", ")println(result)// 第二次行动算子计算时,将直接从文件系统读取resultRDD数据,而不需要从头计算val count = resultRDD.count()println(count)// 停止Spark容器sc.stop()}
}
  • 上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。
  • Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。
  • 创建检查点保存数据的目录

 

  • 运行程序,在控制台查看结果

 

  • 查看HDFS检查点目录,执行命令:hdfs dfs -ls -R /spark-ck

 

三、共享变量

  • 通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

(一)广播变量

  • 广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的。

1、默认情况下变量的传递

  • map()算子传入的函数中使用外部变量arr

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()

 

  • 上述代码中,传递给map()算子的函数(_, arr)会被发送到Executor端执行,而变量arr将发送到Worker节点所有Task任务中。变量arr传递的流程如下图所示

  • 假设变量arr存储的数据量大小有100MB,则每一个Task任务都需要维护100MB的副本,若某一个Executor中启动了3个Task任务,则该Executor将消耗300MB内存。

2、使用广播变量时变量的传递

  • 广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的value方法访问广播变量的值

 

  • 使用广播变量将数组arr传递给map()算子

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()

 

  • 上述代码使用broadcast()方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar,该变量是一个org.apache.spark.broadcast.Broadcast对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。使用广播变量进行变量传递的流程如下图所示。

 

  • Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。
  • 输出result的数据

 

(二)累加器

1、累加器功能

  • 累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。

2、不使用累加器

  • 对一个整型数组求和

 

  • 上述代码由于sum变量在Driver中定义,而累加操作sum = sum + x会发送到Executor中执行,因此输出结果不正确。

3、使用累加器

  • 对一个整型数组求和

 

scala> val myacc = sc.longAccumulator("My Accumulator") // 声明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端输出结果
  • 上述代码通过调用SparkContext对象的longAccumulator ()方法创建了一个Long类型的累加器,默认初始值为0。也可以使用doubleAccumulator()方法创建Double类型的累加器。
  • 累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。
http://www.dt0577.cn/news/54828.html

相关文章:

  • 利用小偷程序做网站软文接单平台
  • 网络代理服务器怎么设置北京百度seo排名
  • 短视频推广策划方案windows优化大师和鲁大师
  • 河南新蔡有做网站建设的吗什么是网站
  • 北京市住房和建设委员会网站企业培训课程清单
  • 哪里有做网站的单位google seo怎么优化
  • 网站做采集会有问题么百度贴吧首页
  • 知道网站是wp程序做的如何仿站南昌seo快速排名
  • vi设计说明模板seo外包公司哪家好
  • 百度做直播和短视频网站媒体发稿网
  • FlashCS3网站建设详解宁波网站推广方案
  • 教育培训网站建设方案广州今日头条新闻
  • 北关网站制作seo常用的优化工具
  • 做网站和编程有关系吗公司网站建设教程
  • 青岛网站建设优化质量可靠上海百度推广官方电话
  • 美食网站世界足球排名最新
  • 手机客户端seo文章
  • 做群头像的网站在线制作搜索引擎网址有哪些
  • 域名命名网站营销平台建设
  • 现在网站开发语言做网络推广有哪些平台
  • 西安建网站网站推广亚马逊关键词优化怎么做
  • 西咸新区开发建设管理委员会网站百度网站链接提交入口
  • 苏州公司网站免费网站收录入口
  • 周年庆网站要怎么做6自助建站系统源码
  • 红酒网站设计短期的技能培训有哪些
  • 公司的网站如何建设方案自媒体平台app下载
  • 零食网站怎么做百度竞价排名价格查询
  • asp.net 网站运行助手我想在百度发布信息
  • 学校网站建设主要成绩百度一下你就知道官方
  • 帝国网站管理系统 数据库外贸平台哪个网站最好