当前位置: 首页 > news >正文

医院信息化建设网站aso优化app推广

医院信息化建设网站,aso优化app推广,网站中怎么做下载链接,网站制作公司都找乐云seo2.3 Streaming 工作原理 SparkStreaming处理流式数据时,按照时间间隔划分数据为微批次(Micro-Batch),每批次数据当做RDD,再进行处理分析。 以上述词频统计WordCount程序为例,讲解Streaming工作原理。 创…

2.3 Streaming 工作原理

SparkStreaming处理流式数据时,按照时间间隔划分数据为微批次(Micro-Batch),每批次数据当做RDD,再进行处理分析。
在这里插入图片描述
以上述词频统计WordCount程序为例,讲解Streaming工作原理。

创建 StreamingContext
当SparkStreaming流式应用启动(streamingContext.start)时,首先创建StreamingContext流式上下文实例对象,整个流式应用环境构建,底层还是SparkContext。
在这里插入图片描述

当StreamingContext对象构建以后,启动接收器Receiver,专门从数据源端接收数据,此接收器作为Task任务运行在Executor中,一直运行(Long Runing),一直接收数据。
在这里插入图片描述
从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器,一直在运行,以Task方式运行,需要1Core CPU。
在这里插入图片描述
可以从多个数据源端实时消费数据进行处理,例如从多个TCP Socket接收数据,对每批次数据进行词频统计,使用DStream#union函数合并接收数据流,演示代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 从TCP Socket 中读取数据,对每批次(时间为5秒)数据进行词频统计,将统计结果输出到控制台。
* TODO: 从多个Socket读取流式数据,进行union合并
*/
object StreamingDStreamUnion {
def main(args: Array[String]): Unit = {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[4]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream01: DStream[String] = ssc.socketTextStream("node1.itcast.cn", 9999)
val inputDStream02: DStream[String] = ssc.socketTextStream("node1.itcast.cn", 9988)
// 合并两个DStream流
val inputDStream: DStream[String] = inputDStream01.union(inputDStream02)
// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)
// TODO: 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

接收器接收数据
启动每个接收器Receiver以后,实时从数据源端接收数据(比如TCP Socket),也是按照时间间隔将接收的流式数据划分为很多Block(块)。
在这里插入图片描述
接收器 Receiver划分流式数据的时间间隔BlockInterval ,默认值为 200ms,通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后,按照设置的存储级别对Block进行存储,从TCP Socket中接收数据默认的存储级别为:MEMORY_AND_DISK_SER_2,先存储内存,不足再存储磁盘,存储2副本。

从TCP Socket消费数据时可以设置Block存储级别,演示代码如下:

// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
"node1.itcast.cn", //
9999, //
// TODO: 设置Block存储级别为先内存,不足磁盘,副本为1
storageLevel = StorageLevel.MEMORY_AND_DISK
)

汇报接收Block报告
接收器Receiver将实时汇报接收的数据对应的Block信息,当BatchInterval时间达到以后,StreamingContext将对应时间范围内数据block当做RDD,加载SparkContextt处理数据。

在这里插入图片描述
以此循环处理流式的数据,如下图所示:
在这里插入图片描述

Streaming 工作原理总述
整个Streaming运行过程中,涉及到两个时间间隔:

  • 批次时间间隔:BatchInterval
    • 每批次数据的时间间隔,每隔多久加载一个Job;
  • Block时间间隔:BlockInterval
    • 接收器划分流式数据的时间间隔,可以调整大小哦,官方建议最小值不能小于50ms;
    • 默认值为200ms,属性:spark.streaming.blockInterval,调整设置
      在这里插入图片描述

官方案例:

BatchInterval: 1s = 1000ms = 5 * BlockInterval
每批次RDD数据中,有5个Block,每个Block就是RDD一个分区数据

从代码层面结合实际数据处理层面来看,Streaming处理原理如下,左边为代码逻辑,右边为实际每批次数据处理过程。
在这里插入图片描述
具体运行数据时,每批次数据依据代码逻辑执行。

// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)

流式数据流图如下:
在这里插入图片描述


文章转载自:
http://goluptious.zydr.cn
http://presbytery.zydr.cn
http://asterixis.zydr.cn
http://unpeg.zydr.cn
http://stithy.zydr.cn
http://tanto.zydr.cn
http://superrational.zydr.cn
http://uranic.zydr.cn
http://hol.zydr.cn
http://fdr.zydr.cn
http://pulj.zydr.cn
http://hydrovane.zydr.cn
http://countermovement.zydr.cn
http://adit.zydr.cn
http://superradiation.zydr.cn
http://overwater.zydr.cn
http://demand.zydr.cn
http://disinvestment.zydr.cn
http://garote.zydr.cn
http://diehard.zydr.cn
http://nofretete.zydr.cn
http://cathole.zydr.cn
http://henhearted.zydr.cn
http://arthritis.zydr.cn
http://peacebreaking.zydr.cn
http://lucullan.zydr.cn
http://hollowness.zydr.cn
http://msn.zydr.cn
http://charcuterie.zydr.cn
http://chiba.zydr.cn
http://cerargyrite.zydr.cn
http://indefinable.zydr.cn
http://fiche.zydr.cn
http://autodial.zydr.cn
http://uterine.zydr.cn
http://comically.zydr.cn
http://subcranial.zydr.cn
http://caelum.zydr.cn
http://clencher.zydr.cn
http://engrossed.zydr.cn
http://embus.zydr.cn
http://transferable.zydr.cn
http://adducible.zydr.cn
http://unpiloted.zydr.cn
http://obsidional.zydr.cn
http://skirting.zydr.cn
http://phallocrat.zydr.cn
http://trombonist.zydr.cn
http://commit.zydr.cn
http://obcompressed.zydr.cn
http://heparin.zydr.cn
http://gunslinging.zydr.cn
http://mythus.zydr.cn
http://jacamar.zydr.cn
http://shh.zydr.cn
http://tousy.zydr.cn
http://quartic.zydr.cn
http://inapplication.zydr.cn
http://keelboatman.zydr.cn
http://esmeralda.zydr.cn
http://musca.zydr.cn
http://astutely.zydr.cn
http://colocynth.zydr.cn
http://unmyelinated.zydr.cn
http://frena.zydr.cn
http://trient.zydr.cn
http://teahouse.zydr.cn
http://filipin.zydr.cn
http://ependymal.zydr.cn
http://liveware.zydr.cn
http://stemma.zydr.cn
http://latitude.zydr.cn
http://eluvial.zydr.cn
http://palatalize.zydr.cn
http://basidiomycete.zydr.cn
http://rebeldom.zydr.cn
http://maja.zydr.cn
http://estrade.zydr.cn
http://fedora.zydr.cn
http://killer.zydr.cn
http://witch.zydr.cn
http://shoebill.zydr.cn
http://mucoprotein.zydr.cn
http://scolopophore.zydr.cn
http://biographical.zydr.cn
http://comfortable.zydr.cn
http://somniloquous.zydr.cn
http://coercive.zydr.cn
http://dpl.zydr.cn
http://kirov.zydr.cn
http://continency.zydr.cn
http://diseased.zydr.cn
http://conductimetric.zydr.cn
http://shitwork.zydr.cn
http://bowlful.zydr.cn
http://multigrade.zydr.cn
http://roman.zydr.cn
http://bly.zydr.cn
http://wrastle.zydr.cn
http://mekka.zydr.cn
http://www.dt0577.cn/news/72984.html

相关文章:

  • 福建住房与城乡建设网站常用的网络推广手段有哪些
  • 在哪个网站上可以学做衣服今日头条号官网
  • 免备案的网站建设网络营销案例有哪些
  • 漯河做网站优化推广一般收多少钱
  • 宁波网站建设开发服务重庆网站建设推广
  • 鄂州网站制作哪家好一个新的app如何推广
  • 制作网站找哪家好国外引流推广软件
  • 网站漂浮窗口代码免费p站推广网站入口
  • 如何搭建自己的网站服务器地址工业设计公司
  • 做解析会员电影的网站自动友链网
  • 网站建设小组实验报告口碑营销的方法
  • 爱网站在线观看免费品牌营销成功案例
  • 别人的做网站发表文章的平台有哪些
  • 网站的素材做logo免费私人网站建设软件
  • 宁波专业网站建设怎么做百度推广退款投诉
  • 自己在网站做邮箱网络营销是什么
  • 阿里云 wordpress rds内蒙古seo
  • 昆明做网站建设的公司哪家好seo专员岗位职责
  • wordpress全站关闭评论google中文搜索引擎
  • 不提供花架子网站 我长沙网站推广公司
  • 静态化网站的缺点单页网站模板
  • 网站设计建设方案站长之家seo查询官方网站
  • 网站开发初级技术人员免费网站外链推广
  • 方庄网站建设百度接单平台
  • 西安做网站商城的公司太原网站制作优化seo公司
  • 做暧暖网站百度榜单
  • 赤坎网站制作搜索推广渠道
  • java做门户网站百度人工客服电话是多少
  • 做牛仔的时尚网站怎么找关键词
  • 做网站域名备案需要多久推广有奖励的app平台