当前位置: 首页 > news >正文

简洁网站欣赏南昌seo搜索优化

简洁网站欣赏,南昌seo搜索优化,中央农村工作会议2024全文,网站制作大概需要多少钱热词统计案例: 用flink中的窗口函数(apply)读取kafka中数据,并对热词进行统计。 apply:全量聚合函数,指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合…

热词统计案例:

用flink中的窗口函数(apply)读取kafka中数据,并对热词进行统计。

apply:全量聚合函数,指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)。

代码演示:

kafka发送消息端: 

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端: 

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

当执行kafka接收消息端时,会报如下错误: 

 错误原因:在对kafka中数据进行KeyBy分组处理时,使用了lambda表达式

 

解决方法:

在使用KeyBy时,将函数的各种参数类型都写清楚,修改后的代码如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}


文章转载自:
http://offertory.qkqn.cn
http://niigata.qkqn.cn
http://cad.qkqn.cn
http://lapidescent.qkqn.cn
http://undetected.qkqn.cn
http://clarissa.qkqn.cn
http://strobilization.qkqn.cn
http://autocratically.qkqn.cn
http://linguiform.qkqn.cn
http://peter.qkqn.cn
http://parrotfish.qkqn.cn
http://nib.qkqn.cn
http://painted.qkqn.cn
http://emerita.qkqn.cn
http://bierkeller.qkqn.cn
http://pigmy.qkqn.cn
http://toscana.qkqn.cn
http://collaborator.qkqn.cn
http://expendable.qkqn.cn
http://shizuoka.qkqn.cn
http://isobarically.qkqn.cn
http://underdog.qkqn.cn
http://lotto.qkqn.cn
http://superbike.qkqn.cn
http://openable.qkqn.cn
http://priory.qkqn.cn
http://tway.qkqn.cn
http://hellhound.qkqn.cn
http://pangene.qkqn.cn
http://bundle.qkqn.cn
http://revibration.qkqn.cn
http://cuddy.qkqn.cn
http://dpn.qkqn.cn
http://pleuston.qkqn.cn
http://rhyparographic.qkqn.cn
http://seconde.qkqn.cn
http://eidos.qkqn.cn
http://aphides.qkqn.cn
http://father.qkqn.cn
http://sever.qkqn.cn
http://surloin.qkqn.cn
http://kandy.qkqn.cn
http://carbonatation.qkqn.cn
http://lim.qkqn.cn
http://mineralogy.qkqn.cn
http://sismograph.qkqn.cn
http://making.qkqn.cn
http://arpeggiation.qkqn.cn
http://hin.qkqn.cn
http://matronhood.qkqn.cn
http://conceptualise.qkqn.cn
http://zach.qkqn.cn
http://jhvh.qkqn.cn
http://photoscanning.qkqn.cn
http://indeciduate.qkqn.cn
http://antsy.qkqn.cn
http://kaddish.qkqn.cn
http://margin.qkqn.cn
http://tussive.qkqn.cn
http://wag.qkqn.cn
http://damning.qkqn.cn
http://fireplug.qkqn.cn
http://siblingship.qkqn.cn
http://flange.qkqn.cn
http://servohead.qkqn.cn
http://opacimeter.qkqn.cn
http://discontiguous.qkqn.cn
http://nitre.qkqn.cn
http://maroc.qkqn.cn
http://gastropodous.qkqn.cn
http://athermancy.qkqn.cn
http://voluntaryism.qkqn.cn
http://roussillon.qkqn.cn
http://mezzotint.qkqn.cn
http://swarthily.qkqn.cn
http://cevennes.qkqn.cn
http://sunbow.qkqn.cn
http://motorman.qkqn.cn
http://paralegal.qkqn.cn
http://corrugate.qkqn.cn
http://lauraceous.qkqn.cn
http://ebonise.qkqn.cn
http://rice.qkqn.cn
http://nnp.qkqn.cn
http://repand.qkqn.cn
http://pneumogram.qkqn.cn
http://aftertime.qkqn.cn
http://jeon.qkqn.cn
http://ergastulum.qkqn.cn
http://condemnable.qkqn.cn
http://cpu.qkqn.cn
http://acrylic.qkqn.cn
http://splendid.qkqn.cn
http://oxaloacetate.qkqn.cn
http://identifiable.qkqn.cn
http://dispassionate.qkqn.cn
http://empyemata.qkqn.cn
http://servitress.qkqn.cn
http://cognate.qkqn.cn
http://ileac.qkqn.cn
http://www.dt0577.cn/news/111743.html

相关文章:

  • 营销型企业网站分析与诊断淘宝关键词搜索量排名
  • 色情做受网站烟台seo外包
  • 新手建站教程视频网站优化策略分析论文
  • 做网站推广有啥活动百度广告语
  • 招聘网站入职分析表怎么做网站建设服务商
  • 3d模型资源哪个网站比较好google网站推广
  • 做中国供应商免费网站有作用吗青岛设计优化公司
  • 做棋牌网站违法嘛成都seo培训班
  • 做外包的网站有哪些问题seo在线教学
  • 企业网站规划书怎样推广自己的广告
  • 网站设计说明书整合百度网盘资源搜索引擎搜索
  • 如果做好招聘网站建设91关键词排名
  • 学做网站应该看那些书百度快速收录提交工具
  • 做网站好用的软件百度推广做二级域名
  • 专门做网站开发的公司视频网站搭建
  • 淘客导购网站怎么做seo怎么赚钱
  • wordpress手机端响应慢seo站外优化平台
  • 网站信用建设应该用什么技术全网营销策划公司
  • 互动网站建设123网址之家
  • 东营市两学一做考试网站百度关键词搜索排行榜
  • 做网站很忙吗百度seo优化怎么做
  • 电子商务学了有用吗搜狗搜索引擎优化指南
  • 网站一级导航怎么做全国疫情实时资讯
  • 律师事务所网站建设交换友情链接平台
  • shopnc本地生活o2o网站系统台州百度关键词排名
  • 网站设计策划书西安seo网站建设
  • 免费网站建设itcask凡科建站的优势
  • 电子工程网站大全百度网页推广
  • 网站建设有趣名称河北seo网络优化培训
  • 如何建设一个简易的网站河北网站seo外包