当前位置: 首页 > news >正文

企联网登录网站优化排名优化

企联网登录,网站优化排名优化,网站建设php培训,wordpress 多多进宝目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】 1. RDD队列 a、使用场景:测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理 val sparkco…

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

http://www.dt0577.cn/news/19211.html

相关文章:

  • -邵阳网络公司-邵阳网站建设|邵阳网站制作零基础学seo要多久
  • 做网站公司 郑州西安网络推广运营公司
  • 给公司做门户网站 可以用凡客吗静态网页制作
  • 云服务器搭建网站教程厦门seo网络推广
  • 做后期的网站有哪些合肥优化营商环境
  • seo优化案例网站推广和网站优化
  • 上海正规网站建设怎么样武汉大学人民医院院长
  • 刷qq会员自己做网站要看网的域名是多少
  • 西安SEO网站推广怎么发布信息到百度
  • 怎么做网站免费的全网热搜榜
  • 做任务刷王者皮肤网站广告网络推广怎么做
  • wordpress 新浪微博插件专业的seo排名优化
  • 网站建设服务器百度推广开户费用标准
  • 盛泽做网站的市场营销渠道
  • WordPress和微信小程序系统优化app
  • 英雄联盟做的广告视频网站大数据平台
  • 兰州网站制作公司怎么在百度上发布信息
  • 易旅游网站建设珠海百度seo
  • 昆明公司建设网站制作足球积分排行榜最新
  • 广西桂林现在能去吗苏州seo网络推广
  • 微信网站模板大全百度自助建站官网
  • 做网站需不需要服务器互联网营销是干什么
  • 深圳企业网站制作设计长沙正规竞价优化服务
  • 怎么做购物平台网站网站建设深圳公司
  • 做阿里巴巴网站图片大全十大搜索引擎地址
  • 青岛商业网站建设附近广告公司
  • 企业wap网站源码超云seo优化
  • 网站建设与管理用什么软件有哪些seo提升关键词排名
  • 网络营销推广运营山东网络优化公司排名
  • 四川网站建设培训班短视频seo营销系统