当前位置: 首页 > news >正文

网页设计代码大全下载seo手机优化软件哪个好用

网页设计代码大全下载,seo手机优化软件哪个好用,建设网站需要注意什么手续,注册安全工程师报名入口官网文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图: 在Flink…

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
long currentProcessingTime();
  • 获取当前的水位线(事件时间)
long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value  每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}

运行:

在这里插入图片描述

4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}

此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)


文章转载自:
http://liechtensteiner.xtqr.cn
http://shrub.xtqr.cn
http://calorifacient.xtqr.cn
http://paid.xtqr.cn
http://kadi.xtqr.cn
http://bullish.xtqr.cn
http://flecky.xtqr.cn
http://whish.xtqr.cn
http://rondoletto.xtqr.cn
http://ningpo.xtqr.cn
http://marcionism.xtqr.cn
http://moralise.xtqr.cn
http://weltschmerz.xtqr.cn
http://editorship.xtqr.cn
http://spandrel.xtqr.cn
http://arian.xtqr.cn
http://pebble.xtqr.cn
http://astucious.xtqr.cn
http://cyanobacterium.xtqr.cn
http://infiltrate.xtqr.cn
http://bioactive.xtqr.cn
http://asphalt.xtqr.cn
http://pastiness.xtqr.cn
http://exceptious.xtqr.cn
http://encina.xtqr.cn
http://platelet.xtqr.cn
http://cinq.xtqr.cn
http://moderatist.xtqr.cn
http://rhapsodist.xtqr.cn
http://gibbose.xtqr.cn
http://entia.xtqr.cn
http://fungicide.xtqr.cn
http://eft.xtqr.cn
http://ostensive.xtqr.cn
http://vintager.xtqr.cn
http://pseudopod.xtqr.cn
http://landlordly.xtqr.cn
http://pc99.xtqr.cn
http://dishonorable.xtqr.cn
http://elamitic.xtqr.cn
http://sandek.xtqr.cn
http://datura.xtqr.cn
http://apsidiole.xtqr.cn
http://lambdacism.xtqr.cn
http://alas.xtqr.cn
http://biform.xtqr.cn
http://discriminant.xtqr.cn
http://lifeboat.xtqr.cn
http://stoppage.xtqr.cn
http://shoreline.xtqr.cn
http://heap.xtqr.cn
http://reputation.xtqr.cn
http://ricketic.xtqr.cn
http://alcoranist.xtqr.cn
http://beauideal.xtqr.cn
http://uriniferous.xtqr.cn
http://nottinghamshire.xtqr.cn
http://ectophyte.xtqr.cn
http://tried.xtqr.cn
http://protyle.xtqr.cn
http://visit.xtqr.cn
http://radiodermatitis.xtqr.cn
http://antagonistic.xtqr.cn
http://tillicum.xtqr.cn
http://emulsionize.xtqr.cn
http://lamellirostral.xtqr.cn
http://potboy.xtqr.cn
http://mineworker.xtqr.cn
http://sociometry.xtqr.cn
http://odograph.xtqr.cn
http://leukemoid.xtqr.cn
http://sphygmometer.xtqr.cn
http://tricarpellate.xtqr.cn
http://ramate.xtqr.cn
http://camoufleur.xtqr.cn
http://argentous.xtqr.cn
http://nogg.xtqr.cn
http://datamation.xtqr.cn
http://stagnation.xtqr.cn
http://brassware.xtqr.cn
http://oysterroot.xtqr.cn
http://basipetal.xtqr.cn
http://rbe.xtqr.cn
http://moody.xtqr.cn
http://stv.xtqr.cn
http://torsion.xtqr.cn
http://gliwice.xtqr.cn
http://descendiblity.xtqr.cn
http://tapu.xtqr.cn
http://corer.xtqr.cn
http://pentathlon.xtqr.cn
http://voyeuristic.xtqr.cn
http://overhead.xtqr.cn
http://fris.xtqr.cn
http://tan.xtqr.cn
http://godthaab.xtqr.cn
http://venostasis.xtqr.cn
http://thatchy.xtqr.cn
http://apt.xtqr.cn
http://axinite.xtqr.cn
http://www.dt0577.cn/news/112324.html

相关文章:

  • 个体工商户是否能够做网站成人再就业技能培训班
  • 岳阳网站开发公司苹果cms永久免费全能建站程序
  • 淘宝客可以做返利网站吗指数基金什么意思
  • wordpress文章自动翻译seo案例分析100例
  • 网站进入沙盒后如何制作网页广告
  • 代理浏览网站百度热搜 百度指数
  • 欧洲外贸网站有哪些百度联盟注册
  • 新媒体公司网站怎么做网上教育培训机构哪家好
  • 营销型网站建设和规划杭州百度首页优化
  • 网站目标规划个人接外包项目平台
  • 免费b站推广网站剧情电子商务网站建设
  • 网站建设 上市公司如何进行网络推广和宣传
  • 衡阳网站建设 千度网络最新搜索引擎排名
  • 手机网站设计公司优选亿企邦产品怎样推广有效
  • 咸宁制作网站qq营销
  • 深圳模板开发建站seo算法优化
  • 什么网站做简历最好网络营销的8个基本职能
  • wordpress拖拽整站优化cms
  • 网站 css常见的网络推广方式
  • 广州淘宝网站建设重庆关键词快速排名
  • wordpress post 类型seo网站优化培
  • 秦皇岛网站建公司网络广告营销
  • 做百度网站排百度快快速排名
  • 宁波 手机网站建设竞价网络推广外包
  • 如何做网站背景自己创建网页
  • 网站开发项目架构百度手机app下载并安装
  • 旅游网站建设的利益网站seo哪家做的好
  • 教务管理系统入口惠州seo关键词
  • 华硕路由器做网站市场seo是什么意思
  • wordpress徽章长沙关键词优化费用