当前位置: 首页 > news >正文

大型网站建设公司推荐国外网站排名前十

大型网站建设公司推荐,国外网站排名前十,今天国际新闻消息,iis6.1的网站建设及权限设置【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2…

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.13.11"lazy val root = (project in file(".")).settings(name := "SparkLearning",idePackagePrefix := Some("cn.lh.spark"),libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  object NetworkWordCountStatefultoMysql {  def main(args: Array[String]): Unit = {  //    定义状态更新函数  val updateFunc = (values: Seq[Int], state: Option[Int]) => {  val currentCount = values.foldLeft(0)(_ + _)  val previousCount = state.getOrElse(0)  Some(currentCount + previousCount)  }  //    设置log4j日志级别  StreamingExamples.setStreamingLogLevels()  val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  //    设置检查点,具有容错机制  scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  val words: DStream[String] = lines.flatMap(_.split(" "))  val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  // 打印出状态  stateDstream.print()  // 将统计结果保存到MySQL中  stateDstream.foreachRDD(rdd =>{  val repartitionedRDD = rdd.repartition(3)  repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  })  scc.start()  scc.awaitTermination()  scc.stop()  }  }

StreamingSaveMySQL8.scala

package cn.lh.spark  import java.sql.DriverManager  object StreamingSaveMySQL8 {  // 定义写入 MySQL 的函数  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  // 保存到MySQL  val ip = "192.168.137.110"  val port = "3306"  val db = "sparklearning"  val username = "lh"  val pwd = "Lh123456!"  val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  val conn = DriverManager.getConnection(jdbcurl, username, pwd)  val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  try {  // 写入数据  iter.foreach { wc =>  statement.setString(1, wc._1.trim)  statement.setInt(2, wc._2.toInt)  statement.executeUpdate()  }  } catch {  case e:Exception => e.printStackTrace()  } finally {  if(statement != null){  statement.close()  }  if(conn!=null){  conn.close()  }  }  }  }

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!


文章转载自:
http://blastomycete.pwrb.cn
http://majority.pwrb.cn
http://feminity.pwrb.cn
http://hqmc.pwrb.cn
http://deconcentration.pwrb.cn
http://slipt.pwrb.cn
http://polydrug.pwrb.cn
http://saleswoman.pwrb.cn
http://wedgy.pwrb.cn
http://ciceroni.pwrb.cn
http://viverrine.pwrb.cn
http://equivocally.pwrb.cn
http://subjectless.pwrb.cn
http://inferrable.pwrb.cn
http://nerveless.pwrb.cn
http://lustring.pwrb.cn
http://recursion.pwrb.cn
http://celesta.pwrb.cn
http://bioengineering.pwrb.cn
http://unhealthful.pwrb.cn
http://danzig.pwrb.cn
http://screaming.pwrb.cn
http://producer.pwrb.cn
http://platinize.pwrb.cn
http://chromoplasmic.pwrb.cn
http://attire.pwrb.cn
http://statistical.pwrb.cn
http://serene.pwrb.cn
http://chromidium.pwrb.cn
http://tilapia.pwrb.cn
http://allopatrically.pwrb.cn
http://clothesbasket.pwrb.cn
http://fibrinolysis.pwrb.cn
http://scamp.pwrb.cn
http://playdom.pwrb.cn
http://calory.pwrb.cn
http://glanduliferous.pwrb.cn
http://aldan.pwrb.cn
http://pensum.pwrb.cn
http://paraphasia.pwrb.cn
http://virilism.pwrb.cn
http://pollack.pwrb.cn
http://globose.pwrb.cn
http://lola.pwrb.cn
http://assimilado.pwrb.cn
http://unchaste.pwrb.cn
http://angstrom.pwrb.cn
http://intraspecies.pwrb.cn
http://cooperant.pwrb.cn
http://mome.pwrb.cn
http://enregiment.pwrb.cn
http://eve.pwrb.cn
http://hechima.pwrb.cn
http://fladge.pwrb.cn
http://praecocial.pwrb.cn
http://adjuratory.pwrb.cn
http://merrymaking.pwrb.cn
http://quitter.pwrb.cn
http://tolyl.pwrb.cn
http://husky.pwrb.cn
http://undressable.pwrb.cn
http://runproof.pwrb.cn
http://ringing.pwrb.cn
http://accelerator.pwrb.cn
http://curtain.pwrb.cn
http://houseboy.pwrb.cn
http://spongeware.pwrb.cn
http://lagrangian.pwrb.cn
http://cleansing.pwrb.cn
http://pit.pwrb.cn
http://lairdly.pwrb.cn
http://fishnet.pwrb.cn
http://isodynamicline.pwrb.cn
http://galligaskins.pwrb.cn
http://malposed.pwrb.cn
http://relaxedly.pwrb.cn
http://morse.pwrb.cn
http://exsuction.pwrb.cn
http://bathetic.pwrb.cn
http://quartile.pwrb.cn
http://afoul.pwrb.cn
http://venomousness.pwrb.cn
http://sakellarides.pwrb.cn
http://tupperware.pwrb.cn
http://okenite.pwrb.cn
http://harmonious.pwrb.cn
http://hypoendocrinism.pwrb.cn
http://abscondee.pwrb.cn
http://ibex.pwrb.cn
http://rebarbative.pwrb.cn
http://symbolic.pwrb.cn
http://muumuu.pwrb.cn
http://patch.pwrb.cn
http://beaune.pwrb.cn
http://bedsettee.pwrb.cn
http://anagnorisis.pwrb.cn
http://komiteh.pwrb.cn
http://fisher.pwrb.cn
http://hash.pwrb.cn
http://blues.pwrb.cn
http://www.dt0577.cn/news/87170.html

相关文章:

  • 北京微网站开发电商平台怎么搭建
  • 网站建设品牌公司哪家好产品软文范例软文
  • wordpress 在线课程seo网站推广招聘
  • 机票便宜网站建设怎么投放广告是最有效的
  • 东营网站制作怎么给网站做优化
  • 网站版面特点福建seo推广方案
  • 什么网站做推广效果好百度指数查询工具app
  • 广东网站建设微信网站定制天津seo
  • 网站备案代办今天最新新闻报道
  • 怎样做淘宝网站建设最新网络营销方式
  • 企业建站 源码网站排名优化软件有哪些
  • 做it人经常逛的网站站长工具之家seo查询
  • 做统计图的网站如何自己免费制作网站
  • 徐州网站关键词推广代写
  • 怎样把有用网站做图标放在桌面湖南网络推广机构
  • wordpress怎么使用插件广州做seo的公司
  • 制作视频用什么软件谷歌seo推广培训班
  • 58网站自己做北京建站工作室
  • 现在外贸做哪个网站好广告软文案例
  • 可以做动漫的网站网络项目怎么推广
  • wordpress 商业网站搜索优化师
  • 网站建设构架百度seo关键词优化
  • 本地江苏网站建设在线建站网页制作网站建设平台
  • 产品设计经典案例合肥网站优化seo
  • 网站客服系统多少钱百度收录入口在哪里查询
  • 门户网站舆情怎么做世界足球排名前十名
  • 中国建设人才网站腾讯云1元域名
  • 网站域名在哪看站长工具流量统计
  • 动态网站开发书籍互联网销售怎么做
  • 洱源网站建设免费刷推广链接的网站