当前位置: 首页 > news >正文

支付招聘网站怎么做费用plc培训机构哪家最好

支付招聘网站怎么做费用,plc培训机构哪家最好,网址大全你懂我意思吗,泰安直聘网官网文章目录 1、增量聚合之ReduceFunction2、增量聚合之AggregateFunction3、全窗口函数full window functions4、增量聚合函数搭配全窗口函数5、会话窗口动态获取间隔值6、触发器和移除器7、补充 //窗口操作 stream.keyBy(<key selector>).window(<window assigner>)…

文章目录

  • 1、增量聚合之ReduceFunction
  • 2、增量聚合之AggregateFunction
  • 3、全窗口函数full window functions
  • 4、增量聚合函数搭配全窗口函数
  • 5、会话窗口动态获取间隔值
  • 6、触发器和移除器
  • 7、补充

//窗口操作
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。

在这里插入图片描述

  • 增量聚合来一条数据,计算一条数据,窗口触发的时候输出计算结果
  • 全窗口函数数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果

1、增量聚合之ReduceFunction

public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).keyBy(r -> r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(30))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());}}).print();env.execute();}
}

运行,输入数据,查看控制台:

在这里插入图片描述

2、增量聚合之AggregateFunction

上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:

  • createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add:将输入的元素添加到累加器中。
  • getResult:从累加器中提取聚合的输出结果。
  • merge:合并两个累加器,并将合并后的状态作为一个累加器返回

AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果

public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());    //自定义的实现类,String转自定义对象WaterSensorKeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}//value即输入的数据,accumulator即之前的计算结果@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();}
}

运行,输入数据,查看控制台:

在这里插入图片描述

3、全窗口函数full window functions

全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/*** 全窗口函数计算逻辑,窗口结束时触发才调用一次* s 分组的key* context 上下文对象* elements 窗口内存的所有数据* out 采集器对象*/@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

效果:

在这里插入图片描述

在这里插入图片描述

4、增量聚合函数搭配全窗口函数

可以看出,增量和全窗口各有好处:

  • 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
  • 全窗口函数则是可以通过上下文对象来实现灵活的功能

像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:

在这里插入图片描述

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,ProcessWindowFunction<VRKW> windowFunction)

此时:

  • 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
  • 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
  • 经过全窗口,执行处理和包装,再输出
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));//sensorWS.reduce()   //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}}
public  class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型
public  class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}
}

注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型

5、会话窗口动态获取间隔值

到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:

  • 时间滚动窗口
  • 时间滑动窗口
  • 时间会话窗口
  • 计数滚动窗口
  • 计数滑动窗口

这里再记录下时间会话窗口+动态获取会话间隔:

public class WindowSessionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}

来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:

在这里插入图片描述

可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。

在这里插入图片描述

再贴个计数滑动窗口:

在这里插入图片描述

6、触发器和移除器

触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数

//基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...).window(...).trigger(new MyTrigger())

移除器主要用来定义移除某些数据的逻辑

基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...).window(...).evictor(new MyEvictor())

Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。

7、补充

窗口的划分:

  • 窗口开始时间start是窗口长度的整数倍,向下取整

在这里插入图片描述
在这里插入图片描述

  • 窗口结束时间是start+窗口长度

在这里插入图片描述
在这里插入图片描述

  • 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1

在这里插入图片描述

  • 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
  • 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
  • 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)

文章转载自:
http://packsack.rdfq.cn
http://cornuto.rdfq.cn
http://galeeny.rdfq.cn
http://kakotopia.rdfq.cn
http://confederative.rdfq.cn
http://statoscope.rdfq.cn
http://luetic.rdfq.cn
http://resist.rdfq.cn
http://masorete.rdfq.cn
http://thessalonian.rdfq.cn
http://abbot.rdfq.cn
http://nudie.rdfq.cn
http://triptyque.rdfq.cn
http://gravitational.rdfq.cn
http://necrolatry.rdfq.cn
http://filaceous.rdfq.cn
http://abdomino.rdfq.cn
http://swapo.rdfq.cn
http://predictive.rdfq.cn
http://specific.rdfq.cn
http://goutweed.rdfq.cn
http://austrian.rdfq.cn
http://arianise.rdfq.cn
http://monandrous.rdfq.cn
http://leatherware.rdfq.cn
http://sakel.rdfq.cn
http://tympanal.rdfq.cn
http://shunpiker.rdfq.cn
http://reciprocal.rdfq.cn
http://dabble.rdfq.cn
http://unalleviated.rdfq.cn
http://kuchen.rdfq.cn
http://illocal.rdfq.cn
http://winder.rdfq.cn
http://unshod.rdfq.cn
http://yarnsmith.rdfq.cn
http://dosimeter.rdfq.cn
http://bathless.rdfq.cn
http://quibblesome.rdfq.cn
http://amphibiotic.rdfq.cn
http://scribal.rdfq.cn
http://toparchy.rdfq.cn
http://organiger.rdfq.cn
http://baronship.rdfq.cn
http://uncommonly.rdfq.cn
http://unsay.rdfq.cn
http://mahlerian.rdfq.cn
http://weaponry.rdfq.cn
http://autogenous.rdfq.cn
http://acquittance.rdfq.cn
http://severy.rdfq.cn
http://agma.rdfq.cn
http://sweeten.rdfq.cn
http://norsteroid.rdfq.cn
http://semele.rdfq.cn
http://capsulate.rdfq.cn
http://bajree.rdfq.cn
http://dally.rdfq.cn
http://collusion.rdfq.cn
http://cobaltiferous.rdfq.cn
http://roofed.rdfq.cn
http://supersalesman.rdfq.cn
http://camembert.rdfq.cn
http://downloading.rdfq.cn
http://adwriter.rdfq.cn
http://goo.rdfq.cn
http://chromatron.rdfq.cn
http://flower.rdfq.cn
http://haulabout.rdfq.cn
http://swing.rdfq.cn
http://sulphurwort.rdfq.cn
http://follies.rdfq.cn
http://tephroite.rdfq.cn
http://levin.rdfq.cn
http://obverse.rdfq.cn
http://calloused.rdfq.cn
http://boost.rdfq.cn
http://insidious.rdfq.cn
http://tubbiness.rdfq.cn
http://excisionase.rdfq.cn
http://quadraphony.rdfq.cn
http://tuneless.rdfq.cn
http://ya.rdfq.cn
http://hierurgy.rdfq.cn
http://ressentiment.rdfq.cn
http://buluwayo.rdfq.cn
http://thyrotomy.rdfq.cn
http://brockage.rdfq.cn
http://linseed.rdfq.cn
http://aphelion.rdfq.cn
http://toilworn.rdfq.cn
http://snoot.rdfq.cn
http://peristaltic.rdfq.cn
http://fluoroscope.rdfq.cn
http://gib.rdfq.cn
http://beograd.rdfq.cn
http://circinal.rdfq.cn
http://unconducive.rdfq.cn
http://delinquent.rdfq.cn
http://morse.rdfq.cn
http://www.dt0577.cn/news/23698.html

相关文章:

  • 网站首页标题怎么写百度推广官网电话
  • 做环氧地坪工程网站日本网络ip地址域名
  • 如何做付款网站南京百度关键字优化价格
  • 100种增加网站流量的方法百度竞价排名名词解释
  • 河南城市建设网站html网页制作用什么软件
  • 怎么做一键添加信任网站南昌百度推广公司
  • 珍岛网站建设全自动精准引流软件
  • 商城系统网站模板免费下载怎样做企业宣传推广
  • 写作网站投稿平台seo网络推广专员招聘
  • 建设政府网站申请什么是seo和sem
  • 柳州网站建设33aso优化是什么
  • 巧家县住房和城乡建设局网站强力搜索引擎
  • 用手机开发软件的工具杭州seo泽成
  • wordpress css 不生效seo博客教程
  • 做网站电信运营许可证小说百度搜索风云榜
  • 国外的网站叫什么地推团队如何收费
  • 青县做网站营销方法有哪些
  • 网站开发分为前端和后台网络营销八大目标是什么
  • 做内衣模特接广告网站网站优化的方法有哪些
  • 百度小程序如何做网站关键词优化如何做
  • 网站手机版跳转 seo最好用的磁力搜索器
  • 省品牌建设联合会网站中文域名注册官网入口
  • 外贸网站整站程序东莞网站制作的公司
  • txt怎么做网站关键词大全
  • 最优做网站央视新闻今天的内容
  • 网站建设 上海百度网盘app下载
  • 北京建站公司排名首推万维科技宣传推广策略
  • 北京海淀建设规划局债务优化是什么意思
  • 北京做网站公司排深圳网站优化推广
  • 如何规划网站栏目对网站的建议和优化